From ed8a99b5d8462e334f22f7dc07584eeec5a128ca Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Fri, 16 Jan 2026 01:44:47 +0100 Subject: [PATCH] refactor(common): consolidate catalog construction into submodules Centralize catalog construction logic from scattered locations into organized submodules under `common::catalog` to improve code organization and reduce duplication. - Move `Function`/`FunctionSource` from `datasets-derived` to `datasets-common` to break circular dependencies - Create `catalog::query_server` for Arrow Flight catalog functions - Create `catalog::dump_derived_dataset` for derived dataset dump catalog - Create `catalog::schema_inference` for admin API schema validation - Create `catalog::validation_derived_dataset` for manifest validation Signed-off-by: Lorenzo Delgado --- Cargo.lock | 2 + crates/core/common/src/catalog.rs | 8 +- .../src/catalog/dump_derived_dataset.rs | 553 +++++++++++ crates/core/common/src/catalog/errors.rs | 267 ------ .../core/common/src/catalog/query_server.rs | 518 ++++++++++ .../common/src/catalog/schema_inference.rs | 448 +++++++++ crates/core/common/src/catalog/sql.rs | 561 ----------- .../src/catalog/validation_derived_dataset.rs | 399 ++++++++ crates/core/datasets-common/src/manifest.rs | 26 + crates/core/datasets-derived/src/catalog.rs | 902 ------------------ crates/core/datasets-derived/src/lib.rs | 1 - crates/core/datasets-derived/src/logical.rs | 139 +-- crates/core/datasets-derived/src/manifest.rs | 31 +- crates/core/dump/Cargo.toml | 20 +- crates/core/dump/src/derived_dataset.rs | 35 +- .../services/admin-api/src/handlers/schema.rs | 142 ++- crates/services/server/Cargo.toml | 2 + crates/services/server/src/flight.rs | 71 +- tests/src/tests/it_admin_api_schema.rs | 4 +- tests/src/tests/it_reorg.rs | 30 +- 20 files changed, 2207 insertions(+), 1952 deletions(-) create mode 100644 crates/core/common/src/catalog/dump_derived_dataset.rs delete mode 100644 crates/core/common/src/catalog/errors.rs create mode 100644 crates/core/common/src/catalog/query_server.rs create mode 100644 crates/core/common/src/catalog/schema_inference.rs delete mode 100644 crates/core/common/src/catalog/sql.rs create mode 100644 crates/core/common/src/catalog/validation_derived_dataset.rs delete mode 100644 crates/core/datasets-derived/src/catalog.rs diff --git a/Cargo.lock b/Cargo.lock index 8239a7d26..2595551bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9276,8 +9276,10 @@ dependencies = [ "bytes", "common", "datafusion", + "datasets-common", "dump", "futures", + "js-runtime", "metadata-db", "monitoring", "prost 0.13.5", diff --git a/crates/core/common/src/catalog.rs b/crates/core/common/src/catalog.rs index 0d12e6724..8dad50aa1 100644 --- a/crates/core/common/src/catalog.rs +++ b/crates/core/common/src/catalog.rs @@ -1,6 +1,10 @@ pub mod dataset_access; -pub mod errors; pub mod logical; pub mod physical; pub mod reader; -pub mod sql; + +// Catalog construction submodules +pub mod dump_derived_dataset; +pub mod query_server; +pub mod schema_inference; +pub mod validation_derived_dataset; diff --git a/crates/core/common/src/catalog/dump_derived_dataset.rs b/crates/core/common/src/catalog/dump_derived_dataset.rs new file mode 100644 index 000000000..9c23e7839 --- /dev/null +++ b/crates/core/common/src/catalog/dump_derived_dataset.rs @@ -0,0 +1,553 @@ +//! Derived dataset catalog types and catalog creation functions. +//! +//! This module provides catalog creation specifically for derived dataset execution. +//! It works with pre-resolved dataset dependencies (DepAlias → Hash mappings) for +//! deterministic, reproducible derived dataset dumps. + +use std::{ + collections::{BTreeMap, btree_map::Entry}, + sync::Arc, +}; + +use amp_data_store::DataStore; +use datafusion::logical_expr::{ScalarUDF, async_udf::AsyncScalarUDF}; +use datasets_common::{ + deps::alias::{DepAlias, DepAliasOrSelfRef}, + func_name::{ETH_CALL_FUNCTION_NAME, FuncName}, + hash::Hash, + hash_reference::HashReference, + manifest::Function, + table_name::TableName, +}; +use js_runtime::isolate_pool::IsolatePool; + +use crate::{ + BoxError, ResolvedTable, + catalog::{ + dataset_access::DatasetAccess, + logical::LogicalCatalog, + physical::{Catalog, PhysicalTable}, + }, + js_udf::JsUdf, + sql::{FunctionReference, TableReference}, +}; + +/// Resolved SQL references tuple (table refs, function refs) for derived dataset execution. +pub type ResolvedReferences = ( + Vec>, + Vec>, +); + +/// Creates a full catalog with physical data access for derived dataset dumps. +/// +/// This function builds a complete catalog containing: +/// - **Physical tables**: Table metadata with parquet file locations from the data store +/// - **Logical catalog**: Table schemas and UDFs required for SQL query planning +/// +/// The catalog enables deterministic, reproducible derived dataset execution by using +/// pre-resolved dataset dependencies (DepAlias → Hash mappings) from the dataset manifest. +/// +/// ## Where Used +/// +/// Called exclusively by `dump_derived_dataset` in `crates/core/dump/src/derived_dataset.rs` +/// during the dump execution phase (NOT during validation). +/// +/// ## Dependency Resolution +/// +/// Unlike validation functions that work with unresolved dependencies: +/// - All dependencies MUST be pre-resolved to `HashReference` (content-addressable hashes) +/// - Uses locked dataset hashes from the manifest for deterministic execution +/// - No dynamic resolution or "latest" version lookups occur +/// - Ensures reproducible dumps across different executions +/// +/// ## Implementation +/// +/// The function: +/// 1. Calls [`create_logical`] to resolve datasets and build the logical catalog +/// 2. Queries metadata database for physical parquet locations +/// 3. Constructs physical catalog for query execution +pub async fn create_phy( + dataset_store: &impl DatasetAccess, + data_store: &DataStore, + isolate_pool: &IsolatePool, + manifest_deps: &BTreeMap, + manifest_udfs: &BTreeMap, + refs: ResolvedReferences, +) -> Result { + // Get logical catalog (tables + UDFs) + let logical = create_logical( + dataset_store, + isolate_pool, + manifest_deps, + manifest_udfs, + refs, + ) + .await + .map_err(CreateCatalogError::CreateLogical)?; + + // Build physical catalog from resolved tables + let mut physical_tables = Vec::new(); + for table in &logical.tables { + let dataset_ref = table.dataset_reference(); + let table_name = table.name(); + + let revision = data_store + .get_table_active_revision(dataset_ref, table_name) + .await + .map_err(|err| CreateCatalogError::PhysicalTableRetrieval { + dataset: dataset_ref.clone(), + table: table_name.clone(), + source: err, + })? + .ok_or(CreateCatalogError::TableNotSynced { + dataset: dataset_ref.clone(), + table: table_name.clone(), + })?; + + let physical_table = PhysicalTable::from_active_revision( + data_store.clone(), + table.dataset_reference().clone(), + table.dataset_start_block(), + table.table().clone(), + revision, + table.sql_table_ref_schema().to_string(), + ); + physical_tables.push(physical_table.into()); + } + + Ok(Catalog::new(physical_tables, logical)) +} + +/// Creates a logical catalog for derived dataset SQL validation without physical data access. +/// +/// This function builds a logical catalog with schemas only, enabling query plan generation +/// and schema inference without accessing physical parquet files. +/// +/// ## Where Used +/// +/// This function is used during derived dataset dump execution when only logical validation +/// is needed (such as during query planning phases). +/// +/// ## Implementation +/// +/// The function: +/// 1. Destructures the references tuple into table and function references +/// 2. Resolves table references to ResolvedTable instances using pre-resolved dependencies +/// 3. Resolves function references to ScalarUDF instances +/// 4. Returns a LogicalCatalog containing tables and UDFs +pub async fn create_logical( + dataset_store: &impl DatasetAccess, + isolate_pool: &IsolatePool, + manifest_deps: &BTreeMap, + manifest_udfs: &BTreeMap, + refs: ResolvedReferences, +) -> Result { + let (table_refs, func_refs) = refs; + + let tables = resolve_tables(dataset_store, manifest_deps, table_refs) + .await + .map_err(CreateLogicalCatalogError::ResolveTables)?; + let udfs = resolve_udfs( + dataset_store, + isolate_pool, + manifest_deps, + manifest_udfs, + func_refs, + ) + .await + .map_err(CreateLogicalCatalogError::ResolveUdfs)?; + + Ok(LogicalCatalog { tables, udfs }) +} + +/// Resolves table references to ResolvedTable instances using pre-resolved dependencies. +/// +/// Processes each table reference, looks up the dataset by hash, finds the table +/// within the dataset, and creates a ResolvedTable for catalog construction. +async fn resolve_tables( + dataset_store: &impl DatasetAccess, + manifest_deps: &BTreeMap, + refs: impl IntoIterator>, +) -> Result, ResolveTablesError> { + // Use hash-based map to deduplicate datasets and collect resolved tables + // Inner map: table_ref -> ResolvedTable (deduplicates table references) + let mut tables: BTreeMap, ResolvedTable>> = + BTreeMap::new(); + + for table_ref in refs { + match &table_ref { + TableReference::Bare { .. } => { + return Err(ResolveTablesError::UnqualifiedTable { + table_ref: table_ref.to_string(), + }); + } + TableReference::Partial { schema, table } => { + // Schema is already parsed as DepAlias, lookup in dependencies map + let dataset_ref = manifest_deps.get(schema.as_ref()).ok_or_else(|| { + ResolveTablesError::DependencyAliasNotFound { + alias: schema.as_ref().clone(), + } + })?; + + // Skip if table reference is already resolved (optimization to avoid redundant dataset loading) + let Entry::Vacant(entry) = tables + .entry(dataset_ref.hash().clone()) + .or_default() + .entry(table_ref.clone()) + else { + continue; + }; + + // Load dataset by hash (cached by dataset_store) + let dataset = dataset_store + .get_dataset(dataset_ref) + .await + .map_err(|err| ResolveTablesError::GetDataset { + reference: dataset_ref.clone(), + source: err, + })?; + + // Find table in dataset + let dataset_table = dataset + .tables + .iter() + .find(|t| t.name() == table) + .ok_or_else(|| ResolveTablesError::TableNotFoundInDataset { + table_name: table.as_ref().clone(), + reference: dataset_ref.clone(), + })?; + + // Create ResolvedTable + let resolved_table = ResolvedTable::new( + dataset_table.clone(), + schema.to_string(), + dataset_ref.clone(), + dataset.start_block, + ); + + // Insert into vacant entry + entry.insert(resolved_table); + } + } + } + + // Flatten to Vec + Ok(tables + .into_values() + .flat_map(|map| map.into_values()) + .collect()) +} + +/// Resolves function references to ScalarUDF instances using pre-resolved dependencies. +/// +/// Processes each function reference: +/// - For external dependencies (dep.function): loads dataset and retrieves UDF +/// - For self-references (self.function): creates JsUdf from the manifest's function definition +/// - Skips bare functions (built-in DataFusion/Amp functions) +async fn resolve_udfs( + dataset_store: &impl DatasetAccess, + isolate_pool: &IsolatePool, + manifest_deps: &BTreeMap, + manifest_udfs: &BTreeMap, + refs: impl IntoIterator>, +) -> Result, ResolveUdfsError> { + // Track UDFs from external dependencies - outer key: dataset hash, inner key: function reference + // Inner map ensures deduplication: multiple function references to the same UDF share one instance + let mut udfs: BTreeMap, ScalarUDF>> = + BTreeMap::new(); + // Track UDFs defined in this manifest (bare functions and self-references) - separate from dependency functions + // Ensures deduplication: multiple references to the same function share one instance + let mut self_udfs: BTreeMap, ScalarUDF> = BTreeMap::new(); + + for func_ref in refs { + match &func_ref { + // Skip bare functions - they are assumed to be built-in functions (Amp or DataFusion) + FunctionReference::Bare { function: _ } => continue, + FunctionReference::Qualified { schema, function } => { + // Match on schema type: DepAlias (external dependency) or SelfRef (same-dataset function) + match schema.as_ref() { + DepAliasOrSelfRef::DepAlias(dep_alias) => { + // External dependency reference - lookup in dependencies map + let dataset_ref = manifest_deps.get(dep_alias).ok_or_else(|| { + ResolveUdfsError::DependencyAliasNotFound { + alias: dep_alias.clone(), + } + })?; + + // Check vacancy BEFORE loading dataset + let Entry::Vacant(entry) = udfs + .entry(dataset_ref.hash().clone()) + .or_default() + .entry(func_ref.clone()) + else { + continue; + }; + + // Only load dataset if UDF not already resolved + let dataset = + dataset_store + .get_dataset(dataset_ref) + .await + .map_err(|err| ResolveUdfsError::GetDataset { + reference: dataset_ref.clone(), + source: err, + })?; + + // Get the UDF for this function reference + let udf = if function.as_ref() == ETH_CALL_FUNCTION_NAME { + dataset_store + .eth_call_for_dataset(&schema.to_string(), &dataset) + .await + .map_err(|err| ResolveUdfsError::EthCallUdfCreation { + reference: dataset_ref.clone(), + source: err, + })? + .ok_or_else(|| ResolveUdfsError::EthCallNotAvailable { + reference: dataset_ref.clone(), + })? + } else { + dataset + .function_by_name( + schema.to_string(), + function, + isolate_pool.clone(), + ) + .ok_or_else(|| ResolveUdfsError::FunctionNotFoundInDataset { + function_name: (**function).clone(), + reference: dataset_ref.clone(), + })? + }; + + entry.insert(udf); + } + DepAliasOrSelfRef::SelfRef => { + // Same-dataset function reference (self.function_name) + // Look up function in the functions map (defined in this dataset) + let func_def = manifest_udfs.get(function).ok_or_else(|| { + ResolveUdfsError::SelfReferencedFunctionNotFound { + function_name: (**function).clone(), + } + })?; + + // Skip if function reference is already resolved (optimization) + let Entry::Vacant(entry) = self_udfs.entry(func_ref.clone()) else { + continue; + }; + + // Create UDF from Function definition using JsUdf + // Use "self" as schema qualifier to preserve case sensitivity + let udf = AsyncScalarUDF::new(Arc::new(JsUdf::new( + isolate_pool.clone(), + Some(datasets_common::deps::alias::SELF_REF_KEYWORD.to_string()), // Schema = "self" + func_def.source.source.clone(), + func_def.source.filename.clone().into(), + Arc::from(function.as_ref().as_str()), + func_def + .input_types + .iter() + .map(|dt| dt.clone().into_arrow()) + .collect(), + func_def.output_type.clone().into_arrow(), + ))) + .into_scalar_udf(); + + entry.insert(udf); + } + } + } + } + } + + // Flatten and combine UDFs + Ok(self_udfs + .into_values() + .chain(udfs.into_values().flat_map(|map| map.into_values())) + .collect()) +} + +/// Errors that can occur when creating a derived dataset catalog. +/// +/// Returned by [`create_phy`] when catalog creation fails. +#[derive(Debug, thiserror::Error)] +pub enum CreateCatalogError { + /// Failed to create logical catalog. + /// + /// This occurs when: + /// - Dataset references cannot be resolved + /// - Tables or functions are not found in datasets + #[error("Failed to create logical catalog")] + CreateLogical(#[source] CreateLogicalCatalogError), + + /// Failed to retrieve physical table metadata from the metadata database. + /// + /// This occurs when querying the metadata database for the active physical + /// location of a table fails due to database connection issues, query errors, + /// or other database-related problems. + #[error("Failed to retrieve physical table metadata for table {dataset}.{table}")] + PhysicalTableRetrieval { + /// The hash reference of the dataset containing the table + dataset: HashReference, + /// The name of the table for which metadata retrieval failed + table: TableName, + #[source] + source: amp_data_store::GetTableActiveRevisionError, + }, + + /// Table has not been synced and no physical location exists. + /// + /// This occurs when attempting to load a physical catalog for a table that + /// has been defined but has not yet been dumped/synced to storage. The table + /// exists in the dataset definition but has no physical parquet files. + #[error("Table {dataset}.{table} has not been synced")] + TableNotSynced { + /// The hash reference of the dataset containing the table + dataset: HashReference, + /// The name of the table that has not been synced + table: TableName, + }, +} + +/// Errors specific to create_logical operations. +/// +/// This error type is used by `create_logical()` to create +/// a logical catalog for derived dataset execution. +#[derive(Debug, thiserror::Error)] +pub enum CreateLogicalCatalogError { + /// Failed to resolve table references to ResolvedTable instances. + #[error(transparent)] + ResolveTables(ResolveTablesError), + + /// Failed to resolve function references to UDF instances. + #[error(transparent)] + ResolveUdfs(ResolveUdfsError), +} + +/// Errors that can occur when resolving table references with dependencies. +#[derive(Debug, thiserror::Error)] +pub enum ResolveTablesError { + /// Table is not qualified with a schema/dataset name. + /// + /// All tables must be qualified with a dataset reference in the schema portion. + /// Unqualified tables (e.g., just `table_name`) are not allowed. + #[error("Unqualified table '{table_ref}', all tables must be qualified with a dataset")] + UnqualifiedTable { + /// The unqualified table reference string + table_ref: String, + }, + + /// Dependency alias not found when processing table reference. + /// + /// This occurs when a table reference uses an alias that was not provided + /// in the dependencies map. + #[error( + "Dependency alias '{alias}' referenced in table reference but not provided in dependencies" + )] + DependencyAliasNotFound { + /// The dependency alias that was not found in the dependencies map + alias: DepAlias, + }, + + /// Failed to retrieve dataset from store when loading dataset for table reference. + /// + /// This occurs when loading a dataset definition fails: + /// - Dataset not found in the store + /// - Dataset manifest is invalid or corrupted + /// - Unsupported dataset kind + /// - Storage backend errors when reading the dataset + #[error("Failed to retrieve dataset '{reference}' for table reference")] + GetDataset { + /// The hash reference of the dataset that failed to load + reference: HashReference, + #[source] + source: BoxError, + }, + + /// Table not found in dataset. + /// + /// This occurs when the table name is referenced in the SQL query but the + /// dataset does not contain a table with that name. + #[error("Table '{table_name}' not found in dataset '{reference}'")] + TableNotFoundInDataset { + /// The name of the table that was not found + table_name: TableName, + /// The hash reference of the dataset that was searched + reference: HashReference, + }, +} + +/// Errors that can occur when resolving UDF references with dependencies. +#[derive(Debug, thiserror::Error)] +pub enum ResolveUdfsError { + /// Dependency alias not found when processing function reference. + /// + /// This occurs when a function reference uses an alias that was not provided + /// in the dependencies map. + #[error( + "Dependency alias '{alias}' referenced in function reference but not provided in dependencies" + )] + DependencyAliasNotFound { + /// The dependency alias that was not found in the dependencies map + alias: DepAlias, + }, + + /// Failed to retrieve dataset from store when loading dataset for function. + /// + /// This occurs when loading a dataset definition for a function fails: + /// - Dataset not found in the store + /// - Dataset manifest is invalid or corrupted + /// - Unsupported dataset kind + /// - Storage backend errors when reading the dataset + #[error("Failed to retrieve dataset '{reference}' for function reference")] + GetDataset { + /// The hash reference of the dataset that failed to load + reference: HashReference, + #[source] + source: BoxError, + }, + + /// Failed to create ETH call UDF for dataset referenced in function name. + /// + /// This occurs when creating the eth_call user-defined function for a function fails: + /// - Invalid provider configuration for the dataset + /// - Provider connection issues + /// - Dataset is not an EVM RPC dataset but eth_call was requested + #[error("Failed to create ETH call UDF for dataset '{reference}' for function reference")] + EthCallUdfCreation { + /// The hash reference of the dataset for which eth_call UDF creation failed + reference: HashReference, + #[source] + source: BoxError, + }, + + /// eth_call function not available for dataset. + /// + /// This occurs when the eth_call function is referenced in SQL but the + /// dataset does not support eth_call (not an EVM RPC dataset or no provider configured). + #[error("Function 'eth_call' not available for dataset '{reference}'")] + EthCallNotAvailable { + /// The hash reference of the dataset that does not support eth_call + reference: HashReference, + }, + + /// Function not found in dataset. + /// + /// This occurs when a function is referenced in the SQL query but the + /// dataset does not contain a function with that name. + #[error("Function '{function_name}' not found in dataset '{reference}'")] + FunctionNotFoundInDataset { + /// The name of the function that was not found + function_name: FuncName, + /// The hash reference of the dataset that was searched + reference: HashReference, + }, + + /// Self-referenced function not found in manifest's functions map. + /// + /// This occurs when a SQL query uses `self.function_name` syntax but the + /// function is not defined in the manifest's `functions` section. + #[error("Self-referenced function '{function_name}' not found in manifest functions")] + SelfReferencedFunctionNotFound { + /// The function name that was referenced but not defined + function_name: FuncName, + }, +} diff --git a/crates/core/common/src/catalog/errors.rs b/crates/core/common/src/catalog/errors.rs deleted file mode 100644 index 396cfc111..000000000 --- a/crates/core/common/src/catalog/errors.rs +++ /dev/null @@ -1,267 +0,0 @@ -use datasets_common::{ - hash_reference::HashReference, partial_reference::PartialReferenceError, reference::Reference, - table_name::TableName, -}; - -use crate::{ - BoxError, - sql::{ResolveFunctionReferencesError, ResolveTableReferencesError}, -}; - -#[derive(Debug, thiserror::Error)] -pub enum CatalogForSqlError { - /// Failed to resolve table references from the SQL statement. - /// - /// This occurs when: - /// - Table references contain invalid identifiers - /// - Table references have unsupported format (not 1-3 parts) - /// - Table names don't conform to identifier rules - /// - Schema portion fails to parse as PartialReference - #[error("Failed to resolve table references from SQL")] - TableReferenceResolution(#[source] ResolveTableReferencesError), - - /// Failed to extract function names from the SQL statement. - /// - /// This occurs when: - /// - The SQL statement contains DML operations (CreateExternalTable, CopyTo) - /// - An EXPLAIN statement wraps an unsupported statement type - /// - Schema portion fails to parse as PartialReference - #[error("Failed to resolve function references from SQL")] - FunctionReferenceResolution(#[source] ResolveFunctionReferencesError), - - /// Failed to get the logical catalog. - /// - /// This wraps errors from `get_logical_catalog`, which can occur when: - /// - Dataset names cannot be extracted from table references or function names - /// - Dataset retrieval fails - /// - UDF creation fails - #[error("Failed to get logical catalog")] - GetLogicalCatalog(#[source] GetLogicalCatalogError), - - /// Failed to retrieve physical table metadata from the metadata database. - /// - /// This occurs when querying the metadata database for the active physical - /// location of a table fails due to database connection issues, query errors, - /// or other database-related problems. - #[error("Failed to retrieve physical table metadata for table {dataset}.{table}")] - PhysicalTableRetrieval { - dataset: HashReference, - table: TableName, - #[source] - source: amp_data_store::GetTableActiveRevisionError, - }, - - /// Table has not been synced and no physical location exists. - /// - /// This occurs when attempting to load a physical catalog for a table that - /// has been defined but has not yet been dumped/synced to storage. The table - /// exists in the dataset definition but has no physical parquet files. - #[error("Table {dataset}.{table} has not been synced")] - TableNotSynced { - dataset: HashReference, - table: TableName, - }, -} - -impl CatalogForSqlError { - /// Returns true if this error is due to a table not being found in a dataset. - pub fn is_table_not_found(&self) -> bool { - matches!( - self, - CatalogForSqlError::GetLogicalCatalog( - GetLogicalCatalogError::TableNotFoundInDataset { .. } - ) - ) - } -} - -/// Errors specific to planning_ctx_for_sql_tables_with_deps operations -/// -/// This error type is used exclusively by `planning_ctx_for_sql_tables_with_deps()` to create -/// a planning context for SQL tables with external dependencies. -#[derive(Debug, thiserror::Error)] -pub enum PlanningCtxForSqlError { - /// Failed to resolve table references from the SQL statement. - /// - /// This occurs when: - /// - Table references contain invalid identifiers - /// - Table references have unsupported format (not 1-3 parts) - /// - Table names don't conform to identifier rules - /// - Schema portion fails to parse as PartialReference - #[error("Failed to resolve table references from SQL")] - TableReferenceResolution(#[source] ResolveTableReferencesError), - - /// Failed to extract function names from the SQL statement. - /// - /// This occurs when analyzing the SQL AST to find function calls: - /// - The SQL statement contains DML operations (CreateExternalTable, CopyTo) - /// - An EXPLAIN statement wraps an unsupported statement type - /// - A function name has an invalid format (more than 2 parts) - /// - Schema portion fails to parse as PartialReference - #[error("Failed to resolve function references from SQL")] - FunctionReferenceResolution(#[source] ResolveFunctionReferencesError), - - /// Table is not qualified with a schema/dataset name. - /// - /// All tables must be qualified with a dataset reference in the schema portion. - /// Unqualified tables (e.g., just `table_name`) are not allowed. - #[error("Unqualified table '{table_ref}', all tables must be qualified with a dataset")] - UnqualifiedTable { table_ref: String }, - - /// Failed to resolve dataset reference to a hash reference. - /// - /// This occurs when the dataset store cannot resolve a reference to its - /// corresponding content hash. Common causes include: - /// - Dataset does not exist in the store - /// - Version tag not found - /// - Storage backend errors - /// - Invalid reference format - /// - Database connection issues - #[error("Failed to resolve dataset reference '{reference}'")] - ResolveDatasetReference { - reference: Reference, - #[source] - source: BoxError, - }, - - /// Failed to load dataset from the dataset store. - /// - /// This occurs when loading a dataset definition fails. Common causes include: - /// - Dataset does not exist in the store - /// - Dataset manifest is invalid or corrupted - /// - Unsupported dataset kind - /// - Storage backend errors when reading the dataset - /// - Manifest file not found in object store - #[error("Failed to load dataset '{reference}'")] - LoadDataset { - reference: HashReference, - #[source] - source: BoxError, - }, - - /// Failed to create ETH call UDF for an EVM RPC dataset. - /// - /// This occurs when creating the eth_call user-defined function for a dataset: - /// - Invalid provider configuration for the dataset - /// - Provider connection issues - /// - Dataset is not an EVM RPC dataset but eth_call was requested - #[error("Failed to create ETH call UDF for dataset '{reference}'")] - EthCallUdfCreation { - reference: HashReference, - #[source] - source: BoxError, - }, - - /// Table not found in dataset. - /// - /// This occurs when the table name is referenced in the SQL query but the - /// dataset does not contain a table with that name. - #[error("Table '{table_name}' not found in dataset '{reference}'")] - TableNotFoundInDataset { - table_name: TableName, - reference: HashReference, - }, - - /// Function not found in dataset. - /// - /// This occurs when a function is referenced in the SQL query but the - /// dataset does not contain a function with that name. - #[error("Function '{function_name}' not found in dataset '{reference}'")] - FunctionNotFoundInDataset { - function_name: String, - reference: HashReference, - }, - - /// eth_call function not available for dataset. - /// - /// This occurs when the eth_call function is referenced in SQL but the - /// dataset does not support eth_call (not an EVM RPC dataset or no provider configured). - #[error("Function 'eth_call' not available for dataset '{reference}'")] - EthCallNotAvailable { reference: HashReference }, -} - -/// Errors specific to catalog_for_sql_with_deps operations -/// -/// This error type is used exclusively by `catalog_for_sql_with_deps()` to create -/// a physical catalog for SQL query execution with pre-resolved dependencies. -#[derive(Debug, thiserror::Error)] -#[allow(clippy::large_enum_variant)] -pub enum GetLogicalCatalogError { - /// Table is not qualified with a schema/dataset name. - /// - /// All tables must be qualified with a dataset reference in the schema portion. - /// Unqualified tables (e.g., just `table_name`) are not allowed. - #[error("Unqualified table '{table_ref}', all tables must be qualified with a dataset")] - UnqualifiedTable { table_ref: String }, - - /// Failed to resolve dataset reference to a hash reference. - /// - /// This occurs when the dataset store cannot resolve a reference to its - /// corresponding content hash. Common causes include: - /// - Dataset does not exist in the store - /// - Version tag not found - /// - Storage backend errors - /// - Invalid reference format - /// - Database connection issues - #[error("Failed to resolve dataset reference '{reference}'")] - ResolveDatasetReference { - reference: Reference, - #[source] - source: BoxError, - }, - - /// Failed to load dataset from the dataset store. - /// - /// This occurs when loading a dataset definition fails. Common causes include: - /// - Dataset does not exist in the store - /// - Dataset manifest is invalid or corrupted - /// - Unsupported dataset kind - /// - Storage backend errors when reading the dataset - /// - Manifest file not found in object store - #[error("Failed to load dataset '{reference}'")] - LoadDataset { - reference: HashReference, - #[source] - source: BoxError, - }, - - /// Failed to create ETH call UDF for an EVM RPC dataset. - /// - /// This occurs when creating the eth_call user-defined function for a dataset: - /// - Invalid provider configuration for the dataset - /// - Provider connection issues - /// - Dataset is not an EVM RPC dataset but eth_call was requested - #[error("Failed to create ETH call UDF for dataset '{reference}'")] - EthCallUdfCreation { - reference: HashReference, - #[source] - source: BoxError, - }, - - /// Function not found in dataset. - /// - /// This occurs when a function is referenced in the SQL query but the - /// dataset does not contain a function with that name. - #[error("Function '{function_name}' not found in dataset '{reference}'")] - FunctionNotFoundInDataset { - function_name: String, - reference: HashReference, - }, - - /// eth_call function not available for dataset. - /// - /// This occurs when the eth_call function is referenced in SQL but the - /// dataset does not support eth_call (not an EVM RPC dataset or no provider configured). - #[error("Function 'eth_call' not available for dataset '{reference}'")] - EthCallNotAvailable { reference: HashReference }, - - /// Table not found in dataset. - /// - /// This occurs when the table name is referenced in the SQL query but the - /// dataset does not contain a table with that name. - #[error("Table '{table_name}' not found in dataset '{reference}'")] - TableNotFoundInDataset { - table_name: TableName, - reference: HashReference, - }, -} diff --git a/crates/core/common/src/catalog/query_server.rs b/crates/core/common/src/catalog/query_server.rs new file mode 100644 index 000000000..81f1d04ad --- /dev/null +++ b/crates/core/common/src/catalog/query_server.rs @@ -0,0 +1,518 @@ +//! Arrow Flight catalog creation for SQL queries. +//! +//! This module provides catalog-building functions exclusively for the Arrow Flight server, +//! handling both query planning (GetFlightInfo) and query execution (DoGet) phases. +//! +//! ## Functions +//! +//! | Function | Purpose | Arrow Flight Operation | +//! |----------|---------|------------------------| +//! | [`create_phy`] | Physical catalog with parquet locations | DoGet (execution) | +//! | [`create_logical`] | Logical catalog with schemas only | GetFlightInfo (planning) | +//! +//! ## Resolution Strategy +//! +//! Both functions use **dynamic resolution** - dataset references are resolved to content +//! hashes at query time, supporting version tags, "latest" revision, and direct hash references. + +use std::{ + collections::{BTreeMap, btree_map::Entry}, + sync::Arc, +}; + +use amp_data_store::DataStore; +use datafusion::logical_expr::ScalarUDF; +use datasets_common::{ + func_name::ETH_CALL_FUNCTION_NAME, hash::Hash, hash_reference::HashReference, + partial_reference::PartialReference, reference::Reference, table_name::TableName, +}; +use js_runtime::isolate_pool::IsolatePool; + +use crate::{ + BoxError, ResolvedTable, + catalog::{ + dataset_access::DatasetAccess, + logical::LogicalCatalog, + physical::{Catalog, PhysicalTable}, + }, + sql::{FunctionReference, TableReference}, +}; + +/// Resolved SQL references tuple (table refs, function refs) using partial references. +pub type ResolvedReferences = ( + Vec>, + Vec>, +); + +/// Creates a full catalog with physical data access for SQL query execution. +/// +/// This function builds a complete catalog that includes both logical schemas and physical +/// parquet file locations, enabling actual query execution with DataFusion. +/// +/// ## Where Used +/// +/// This function is used exclusively in the **Query Execution Path**: +/// +/// - **Arrow Flight DoGet** (`crates/services/server/src/flight.rs`): +/// - Called during Arrow Flight `DoGet` phase to execute user queries +/// - Provides physical catalog for streaming query results to clients +/// +/// ## Implementation +/// +/// The function: +/// 1. Extracts table references and function names from the query +/// 2. Calls [`get_logical_catalog`] to resolve datasets and build the logical catalog +/// 3. Queries metadata database for physical parquet locations +/// 4. Constructs physical catalog for query execution +pub async fn create_phy( + dataset_store: &impl DatasetAccess, + data_store: &DataStore, + isolate_pool: &IsolatePool, + refs: ResolvedReferences, +) -> Result { + // Get logical catalog (tables + UDFs) + let logical = create_logical(dataset_store, isolate_pool, refs) + .await + .map_err(CatalogForSqlError::CreateLogical)?; + + // Build physical catalog from resolved tables + let mut physical_tables = Vec::new(); + for table in &logical.tables { + let dataset_ref = table.dataset_reference(); + + let revision = data_store + .get_table_active_revision(dataset_ref, table.name()) + .await + .map_err(|source| CatalogForSqlError::PhysicalTableRetrieval { + dataset: dataset_ref.clone(), + table: table.name().clone(), + source, + })? + .ok_or_else(|| CatalogForSqlError::TableNotSynced { + dataset: dataset_ref.clone(), + table: table.name().clone(), + })?; + + let physical_table = PhysicalTable::from_active_revision( + data_store.clone(), + table.dataset_reference().clone(), + table.dataset_start_block(), + table.table().clone(), + revision, + table.sql_table_ref_schema().to_string(), + ); + physical_tables.push(Arc::new(physical_table)); + } + + Ok(Catalog::new(physical_tables, logical)) +} + +/// Creates a logical catalog for SQL query planning without physical data access. +/// +/// This function builds a logical catalog with schemas only, enabling query plan generation +/// and schema inference without accessing physical parquet files. +/// +/// ## Where Used +/// +/// This function is used exclusively in the **Query Execution Path** for the planning phase: +/// +/// - **Arrow Flight GetFlightInfo** (`crates/services/server/src/flight.rs`): +/// - Called to generate query plan and return schema to clients +/// - Fast response without accessing physical data files +/// - Precedes actual query execution which uses `catalog_for_sql` +/// +/// ## Implementation +/// +/// The function analyzes the SQL query to: +/// 1. Extract table references and function names from the query +/// 2. Resolve dataset names to hashes via the dataset store +/// 3. Build logical catalog with schemas and UDFs +/// 4. Return logical catalog for use with `PlanningContext::new()` +/// +/// Unlike `catalog_for_sql`, this does not query the metadata database for physical +/// parquet locations, making it faster for planning-only operations. +pub async fn create_logical( + dataset_store: &impl DatasetAccess, + isolate_pool: &IsolatePool, + refs: ResolvedReferences, +) -> Result { + let (table_refs, func_refs) = refs; + + // Resolve logical catalog using shared helpers + let tables = resolve_tables(dataset_store, table_refs) + .await + .map_err(CreateLogicalCatalogError::ResolveTables)?; + let udfs = resolve_udfs(dataset_store, isolate_pool, func_refs) + .await + .map_err(CreateLogicalCatalogError::ResolveUdfs)?; + + Ok(LogicalCatalog { tables, udfs }) +} + +/// Resolves table references to ResolvedTable instances using dynamic resolution. +/// +/// Processes each table reference, resolves the dataset reference to a hash, +/// loads the dataset, finds the table, and creates a ResolvedTable for catalog construction. +async fn resolve_tables( + dataset_store: &impl DatasetAccess, + refs: impl IntoIterator>, +) -> Result, ResolveTablesError> { + // Use hash-based map to deduplicate datasets and collect resolved tables + // Inner map: table_ref -> ResolvedTable (deduplicates table references) + let mut tables: BTreeMap, ResolvedTable>> = + BTreeMap::new(); + + for table_ref in refs { + match &table_ref { + TableReference::Bare { .. } => { + return Err(ResolveTablesError::UnqualifiedTable { + table_ref: table_ref.to_string(), + }); + } + TableReference::Partial { schema, table } => { + // Schema is already parsed as PartialReference, convert to Reference + let reference: Reference = schema.as_ref().clone().into(); + + // Resolve reference to hash reference + let dataset_ref = dataset_store + .resolve_revision(&reference) + .await + .map_err(|err| ResolveTablesError::ResolveDatasetReference { + reference: reference.clone(), + source: err, + })? + .ok_or_else(|| ResolveTablesError::ResolveDatasetReference { + reference: reference.clone(), + source: format!("Dataset '{}' not found", reference).into(), + })?; + + // Skip if table reference is already resolved (optimization to avoid redundant dataset loading) + let Entry::Vacant(entry) = tables + .entry(dataset_ref.hash().clone()) + .or_default() + .entry(table_ref.clone()) + else { + continue; + }; + + // Load dataset by hash reference (cached by dataset_store) + let dataset = dataset_store + .get_dataset(&dataset_ref) + .await + .map_err(|err| ResolveTablesError::LoadDataset { + reference: dataset_ref.clone(), + source: err, + })?; + + // Find table in dataset + let dataset_table = dataset + .tables + .iter() + .find(|t| t.name() == table) + .ok_or_else(|| ResolveTablesError::TableNotFoundInDataset { + table_name: table.as_ref().clone(), + reference: dataset_ref.clone(), + })?; + + // Create ResolvedTable + let resolved_table = ResolvedTable::new( + dataset_table.clone(), + schema.to_string(), + dataset_ref.clone(), + dataset.start_block, + ); + + // Insert into vacant entry + entry.insert(resolved_table); + } + } + } + + // Flatten to Vec + Ok(tables + .into_values() + .flat_map(|map| map.into_values()) + .collect()) +} + +/// Resolves function references to ScalarUDF instances using dynamic resolution. +/// +/// Processes each function reference, resolves the dataset reference, +/// loads the dataset, and retrieves or creates the UDF. +async fn resolve_udfs( + dataset_store: &impl DatasetAccess, + isolate_pool: &IsolatePool, + refs: impl IntoIterator>, +) -> Result, ResolveUdfsError> { + // Track UDFs from external dependencies - outer key: dataset hash, inner key: function reference + // Inner map ensures deduplication: multiple function references to the same UDF share one instance + let mut udfs: BTreeMap, ScalarUDF>> = + BTreeMap::new(); + + for func_ref in refs { + match &func_ref { + // Skip bare functions - they are assumed to be built-in functions (Amp or DataFusion) + FunctionReference::Bare { .. } => continue, + FunctionReference::Qualified { schema, function } => { + // Schema is already parsed as PartialReference, convert to Reference + let reference: Reference = schema.as_ref().clone().into(); + + // Resolve reference to hash reference + let dataset_ref = dataset_store + .resolve_revision(&reference) + .await + .map_err(|err| ResolveUdfsError::ResolveDatasetReference { + reference: reference.clone(), + source: err, + })? + .ok_or_else(|| ResolveUdfsError::ResolveDatasetReference { + reference: reference.clone(), + source: format!("Dataset '{}' not found", reference).into(), + })?; + + // Check vacancy BEFORE loading dataset + let Entry::Vacant(entry) = udfs + .entry(dataset_ref.hash().clone()) + .or_default() + .entry(func_ref.clone()) + else { + continue; + }; + + // Only load dataset if UDF not already resolved + let dataset = dataset_store + .get_dataset(&dataset_ref) + .await + .map_err(|err| ResolveUdfsError::LoadDataset { + reference: dataset_ref.clone(), + source: err, + })?; + + // Get the UDF for this function reference + let udf = if function.as_ref() == ETH_CALL_FUNCTION_NAME { + dataset_store + .eth_call_for_dataset(&schema.to_string(), &dataset) + .await + .map_err(|err| ResolveUdfsError::EthCallUdfCreation { + reference: dataset_ref.clone(), + source: err, + })? + .ok_or_else(|| ResolveUdfsError::EthCallNotAvailable { + reference: dataset_ref.clone(), + })? + } else { + dataset + .function_by_name(schema.to_string(), function, isolate_pool.clone()) + .ok_or_else(|| ResolveUdfsError::FunctionNotFoundInDataset { + function_name: func_ref.to_string(), + reference: dataset_ref, + })? + }; + + entry.insert(udf); + } + } + } + + // Flatten to Vec + Ok(udfs + .into_values() + .flat_map(|map| map.into_values()) + .collect()) +} + +// Error types + +#[derive(Debug, thiserror::Error)] +pub enum CatalogForSqlError { + /// Failed to create logical catalog. + /// + /// This occurs when: + /// - Dataset references cannot be resolved + /// - Tables or functions are not found in datasets + #[error("Failed to create logical catalog")] + CreateLogical(#[source] CreateLogicalCatalogError), + + /// Failed to retrieve physical table metadata from the metadata database. + /// + /// This occurs when querying the metadata database for the active physical + /// location of a table fails due to database connection issues, query errors, + /// or other database-related problems. + #[error("Failed to retrieve physical table metadata for table {dataset}.{table}")] + PhysicalTableRetrieval { + dataset: HashReference, + table: TableName, + #[source] + source: amp_data_store::GetTableActiveRevisionError, + }, + + /// Table has not been synced and no physical location exists. + /// + /// This occurs when attempting to load a physical catalog for a table that + /// has been defined but has not yet been dumped/synced to storage. The table + /// exists in the dataset definition but has no physical parquet files. + #[error("Table {dataset}.{table} has not been synced")] + TableNotSynced { + dataset: HashReference, + table: TableName, + }, +} + +impl CatalogForSqlError { + /// Returns true if this error is due to a table not being found in a dataset. + pub fn is_table_not_found(&self) -> bool { + matches!( + self, + CatalogForSqlError::CreateLogical(CreateLogicalCatalogError::ResolveTables( + ResolveTablesError::TableNotFoundInDataset { .. } + )) + ) + } +} + +/// Errors specific to create_logical operations +/// +/// This error type is used by `create_logical()` to create +/// a logical catalog for Arrow Flight query planning (GetFlightInfo). +#[derive(Debug, thiserror::Error)] +pub enum CreateLogicalCatalogError { + /// Failed to resolve table references to ResolvedTable instances. + #[error(transparent)] + ResolveTables(ResolveTablesError), + + /// Failed to resolve function references to UDF instances. + #[error(transparent)] + ResolveUdfs(ResolveUdfsError), +} + +/// Errors that can occur when resolving table references. +#[derive(Debug, thiserror::Error)] +pub enum ResolveTablesError { + /// Table is not qualified with a schema/dataset name. + /// + /// All tables must be qualified with a dataset reference in the schema portion. + /// Unqualified tables (e.g., just `table_name`) are not allowed. + #[error("Unqualified table '{table_ref}', all tables must be qualified with a dataset")] + UnqualifiedTable { + /// The unqualified table reference string + table_ref: String, + }, + + /// Failed to resolve dataset reference to a hash reference. + /// + /// This occurs when the dataset store cannot resolve a reference to its + /// corresponding content hash. Common causes include: + /// - Dataset does not exist in the store + /// - Version tag not found + /// - Storage backend errors + /// - Invalid reference format + /// - Database connection issues + #[error("Failed to resolve dataset reference '{reference}'")] + ResolveDatasetReference { + /// The dataset reference that failed to resolve + reference: Reference, + #[source] + source: BoxError, + }, + + /// Failed to load dataset from the dataset store. + /// + /// This occurs when loading a dataset definition fails. Common causes include: + /// - Dataset does not exist in the store + /// - Dataset manifest is invalid or corrupted + /// - Unsupported dataset kind + /// - Storage backend errors when reading the dataset + /// - Manifest file not found in object store + #[error("Failed to load dataset '{reference}'")] + LoadDataset { + /// The hash reference of the dataset that failed to load + reference: HashReference, + #[source] + source: BoxError, + }, + + /// Table not found in dataset. + /// + /// This occurs when the table name is referenced in the SQL query but the + /// dataset does not contain a table with that name. + #[error("Table '{table_name}' not found in dataset '{reference}'")] + TableNotFoundInDataset { + /// The name of the table that was not found + table_name: TableName, + /// The hash reference of the dataset that was searched + reference: HashReference, + }, +} + +/// Errors that can occur when resolving UDF references. +#[derive(Debug, thiserror::Error)] +pub enum ResolveUdfsError { + /// Failed to resolve dataset reference to a hash reference. + /// + /// This occurs when the dataset store cannot resolve a reference to its + /// corresponding content hash. Common causes include: + /// - Dataset does not exist in the store + /// - Version tag not found + /// - Storage backend errors + /// - Invalid reference format + /// - Database connection issues + #[error("Failed to resolve dataset reference '{reference}'")] + ResolveDatasetReference { + /// The dataset reference that failed to resolve + reference: Reference, + #[source] + source: BoxError, + }, + + /// Failed to load dataset from the dataset store. + /// + /// This occurs when loading a dataset definition fails. Common causes include: + /// - Dataset does not exist in the store + /// - Dataset manifest is invalid or corrupted + /// - Unsupported dataset kind + /// - Storage backend errors when reading the dataset + /// - Manifest file not found in object store + #[error("Failed to load dataset '{reference}'")] + LoadDataset { + /// The hash reference of the dataset that failed to load + reference: HashReference, + #[source] + source: BoxError, + }, + + /// Failed to create ETH call UDF for dataset referenced in function name. + /// + /// This occurs when creating the eth_call user-defined function for a function fails: + /// - Invalid provider configuration for the dataset + /// - Provider connection issues + /// - Dataset is not an EVM RPC dataset but eth_call was requested + #[error("Failed to create ETH call UDF for dataset '{reference}'")] + EthCallUdfCreation { + /// The hash reference of the dataset for which eth_call UDF creation failed + reference: HashReference, + #[source] + source: BoxError, + }, + + /// eth_call function not available for dataset. + /// + /// This occurs when the eth_call function is referenced in SQL but the + /// dataset does not support eth_call (not an EVM RPC dataset or no provider configured). + #[error("Function 'eth_call' not available for dataset '{reference}'")] + EthCallNotAvailable { + /// The hash reference of the dataset that does not support eth_call + reference: HashReference, + }, + + /// Function not found in dataset. + /// + /// This occurs when a function is referenced in the SQL query but the + /// dataset does not contain a function with that name. + #[error("Function '{function_name}' not found in dataset '{reference}'")] + FunctionNotFoundInDataset { + /// The name of the function that was not found + function_name: String, + /// The hash reference of the dataset that was searched + reference: HashReference, + }, +} diff --git a/crates/core/common/src/catalog/schema_inference.rs b/crates/core/common/src/catalog/schema_inference.rs new file mode 100644 index 000000000..9e5731d49 --- /dev/null +++ b/crates/core/common/src/catalog/schema_inference.rs @@ -0,0 +1,448 @@ +//! Schema inference catalog construction for derived dataset validation +//! +//! This module provides catalog creation for schema inference when validating +//! derived dataset manifests via the admin API. +//! +//! ## Key Functions +//! +//! - [`create`] - Creates a LogicalCatalog for SQL validation +//! - [`resolve_tables_with_deps`] - Resolves table references using pre-resolved dependencies +//! - [`resolve_udfs_with_deps`] - Resolves function references to ScalarUDF instances + +use std::{ + collections::{BTreeMap, btree_map::Entry}, + sync::Arc, +}; + +use datafusion::logical_expr::{ScalarUDF, async_udf::AsyncScalarUDF}; +use datasets_common::{ + deps::alias::{DepAlias, DepAliasOrSelfRef, SELF_REF_KEYWORD}, + func_name::{ETH_CALL_FUNCTION_NAME, FuncName}, + hash::Hash, + hash_reference::HashReference, + manifest::Function, + table_name::TableName, +}; +use js_runtime::isolate_pool::IsolatePool; + +use crate::{ + BoxError, + catalog::{ + dataset_access::DatasetAccess, + logical::{LogicalCatalog, ResolvedTable}, + }, + js_udf::JsUdf, + sql::{FunctionReference, TableReference}, +}; + +/// Map of table names to (table references, function references) +pub type TableReferencesMap = BTreeMap< + TableName, + ( + Vec>, + Vec>, + ), +>; + +/// Creates a LogicalCatalog for SQL validation with pre-resolved dependencies. +/// +/// This function is used during derived dataset manifest validation to create a logical +/// catalog that can validate SQL queries against specific dataset versions. +/// +/// ## Process +/// +/// 1. Flattens table references from the references map +/// 2. Resolves all table references to ResolvedTable instances +/// 3. Flattens function references from the references map +/// 4. Resolves all function references to ScalarUDF instances +/// 5. Creates and returns a LogicalCatalog +/// +/// ## Related Functions +/// +/// - [`resolve_tables`] - Resolves table references to ResolvedTable instances +/// - [`resolve_udfs`] - Resolves function references to UDFs +pub async fn create( + dataset_store: &impl DatasetAccess, + isolate_pool: IsolatePool, + manifest_deps: BTreeMap, + manifest_udfs: BTreeMap, + refs: TableReferencesMap, +) -> Result { + let table_refs: Vec<_> = refs + .iter() + .flat_map(|(name, (table_refs, _))| { + table_refs.iter().map(move |table_ref| (name, table_ref)) + }) + .collect(); + + let tables = resolve_tables(dataset_store, &manifest_deps, table_refs) + .await + .map_err(CreateLogicalCatalogError::ResolveTables)?; + + let func_refs: Vec<_> = refs + .iter() + .flat_map(|(name, (_, func_refs))| func_refs.iter().map(move |func_ref| (name, func_ref))) + .collect(); + + let udfs = resolve_udfs( + dataset_store, + isolate_pool, + &manifest_deps, + &manifest_udfs, + func_refs, + ) + .await + .map_err(CreateLogicalCatalogError::ResolveUdfs)?; + + Ok(LogicalCatalog { tables, udfs }) +} + +#[derive(Debug, thiserror::Error)] +pub enum CreateLogicalCatalogError { + /// Failed to resolve table references to ResolvedTable instances + #[error(transparent)] + ResolveTables(ResolveTablesError), + + /// Failed to resolve function references to UDF instances + #[error(transparent)] + ResolveUdfs(ResolveUdfsError), +} + +/// Resolves table references to ResolvedTable instances using pre-resolved dependencies. +/// +/// Processes each table reference across all tables, looks up datasets by hash, finds tables +/// within datasets, and creates ResolvedTable instances for catalog construction. +async fn resolve_tables<'a>( + dataset_store: &impl DatasetAccess, + manifest_deps: &BTreeMap, + refs: impl IntoIterator)> + 'a, +) -> Result, ResolveTablesError> { + // Use hash-based map to deduplicate datasets across ALL tables + // Inner map: table_ref -> ResolvedTable (deduplicates table references) + let mut tables: BTreeMap, ResolvedTable>> = + BTreeMap::new(); + + // Process all table references - fail fast on first error + for (table_name, table_ref) in refs { + match table_ref { + TableReference::Bare { .. } => { + return Err(ResolveTablesError::UnqualifiedTable { + table_name: table_name.clone(), + table_ref: table_ref.to_string(), + }); + } + TableReference::Partial { schema, table } => { + // Schema is already parsed as DepAlias, lookup in dependencies map + let dataset_ref = manifest_deps.get(schema.as_ref()).ok_or_else(|| { + ResolveTablesError::DependencyAliasNotFound { + table_name: table_name.clone(), + alias: schema.as_ref().clone(), + } + })?; + + // Skip if table reference is already resolved (optimization to avoid redundant dataset loading) + let Entry::Vacant(entry) = tables + .entry(dataset_ref.hash().clone()) + .or_default() + .entry(table_ref.clone()) + else { + continue; + }; + + // Load dataset by hash (cached by dataset_store) + let dataset = dataset_store + .get_dataset(dataset_ref) + .await + .map_err(|err| ResolveTablesError::GetDataset { + table_name: table_name.clone(), + reference: dataset_ref.clone(), + source: err, + })?; + + // Find table in dataset + let dataset_table = dataset + .tables + .iter() + .find(|t| t.name() == table) + .ok_or_else(|| ResolveTablesError::TableNotFoundInDataset { + table_name: table_name.clone(), + referenced_table_name: table.as_ref().clone(), + reference: dataset_ref.clone(), + })?; + + // Create ResolvedTable + let resolved_table = ResolvedTable::new( + dataset_table.clone(), + schema.to_string(), + dataset_ref.clone(), + dataset.start_block, + ); + + // Insert into vacant entry + entry.insert(resolved_table); + } + } + } + + // Flatten to Vec + Ok(tables + .into_values() + .flat_map(|map| map.into_values()) + .collect()) +} + +/// Resolves function references to ScalarUDF instances using pre-resolved dependencies. +/// +/// Processes each function reference across all tables: +/// - For external dependencies (dep.function): loads dataset and retrieves UDF +/// - For self-references (self.function): creates JsUdf from the manifest's function definition +/// - Skips bare functions (built-in DataFusion/Amp functions) +async fn resolve_udfs<'a>( + dataset_store: &impl DatasetAccess, + isolate_pool: IsolatePool, + manifest_deps: &BTreeMap, + manifest_udfs: &BTreeMap, + refs: impl IntoIterator)> + 'a, +) -> Result, ResolveUdfsError> { + // Track UDFs from external dependencies - outer key: dataset hash, inner key: function reference + // Inner map ensures deduplication: multiple function references to the same UDF share one instance + let mut udfs: BTreeMap, ScalarUDF>> = + BTreeMap::new(); + // Track UDFs defined in this manifest (bare functions and self-references) - separate from dependency functions + // Ensures deduplication: multiple references to the same function share one instance + let mut self_udfs: BTreeMap, ScalarUDF> = BTreeMap::new(); + + // Process all function references - fail fast on first error + for (table_name, func_ref) in refs { + match func_ref { + // Skip bare functions - they are assumed to be built-in functions (Amp or DataFusion) + FunctionReference::Bare { function: _ } => { + continue; + } + FunctionReference::Qualified { schema, function } => { + // Match on schema type: DepAlias (external dependency) or SelfRef (same-dataset function) + match schema.as_ref() { + DepAliasOrSelfRef::DepAlias(dep_alias) => { + // External dependency reference - lookup in dependencies map + let dataset_ref = manifest_deps.get(dep_alias).ok_or_else(|| { + ResolveUdfsError::DependencyAliasNotFound { + table_name: table_name.clone(), + alias: dep_alias.clone(), + } + })?; + + // Check vacancy BEFORE loading dataset + let Entry::Vacant(entry) = udfs + .entry(dataset_ref.hash().clone()) + .or_default() + .entry(func_ref.clone()) + else { + continue; + }; + + // Only load dataset if UDF not already resolved (cached by dataset_store) + let dataset = + dataset_store + .get_dataset(dataset_ref) + .await + .map_err(|err| ResolveUdfsError::GetDataset { + table_name: table_name.clone(), + reference: dataset_ref.clone(), + source: err, + })?; + + // Get the UDF for this function reference + let udf = if function.as_ref() == ETH_CALL_FUNCTION_NAME { + dataset_store + .eth_call_for_dataset(&schema.to_string(), &dataset) + .await + .map_err(|err| ResolveUdfsError::EthCallUdfCreation { + table_name: table_name.clone(), + reference: dataset_ref.clone(), + source: err, + })? + .ok_or_else(|| ResolveUdfsError::EthCallNotAvailable { + table_name: table_name.clone(), + reference: dataset_ref.clone(), + })? + } else { + dataset + .function_by_name( + schema.to_string(), + function, + IsolatePool::dummy(), + ) + .ok_or_else(|| ResolveUdfsError::FunctionNotFoundInDataset { + table_name: table_name.clone(), + function_name: (**function).clone(), + reference: dataset_ref.clone(), + })? + }; + + entry.insert(udf); + } + DepAliasOrSelfRef::SelfRef => { + // Same-dataset function reference (self.function_name) + // Look up function in the functions map (defined in this dataset) + let func_def = manifest_udfs.get(function).ok_or_else(|| { + ResolveUdfsError::SelfReferencedFunctionNotFound { + table_name: table_name.clone(), + function_name: (**function).clone(), + } + })?; + + // Skip if function reference is already resolved (optimization) + let Entry::Vacant(entry) = self_udfs.entry(func_ref.clone()) else { + continue; + }; + + // Create UDF from Function definition using JsUdf + // Use "self" as schema qualifier to preserve case sensitivity + let udf = AsyncScalarUDF::new(Arc::new(JsUdf::new( + isolate_pool.clone(), + Some(SELF_REF_KEYWORD.to_string()), // Schema = "self" + func_def.source.source.clone(), + func_def.source.filename.clone().into(), + Arc::from(function.as_ref().as_str()), + func_def + .input_types + .iter() + .map(|dt| dt.clone().into_arrow()) + .collect(), + func_def.output_type.clone().into_arrow(), + ))) + .into_scalar_udf(); + + entry.insert(udf); + } + } + } + } + } + + // Flatten to Vec + Ok(self_udfs + .into_values() + .chain(udfs.into_values().flat_map(|map| map.into_values())) + .collect()) +} + +#[derive(Debug, thiserror::Error)] +pub enum ResolveTablesError { + /// Table is not qualified with a schema/dataset name + #[error( + "In table '{table_name}': Unqualified table '{table_ref}', all tables must be qualified with a dataset" + )] + UnqualifiedTable { + /// The table being processed when the error occurred + table_name: TableName, + /// The unqualified table reference string + table_ref: String, + }, + + /// Dependency alias not found when processing table reference + #[error( + "In table '{table_name}': Dependency alias '{alias}' referenced in table but not provided in dependencies" + )] + DependencyAliasNotFound { + /// The table being processed when the error occurred + table_name: TableName, + /// The dependency alias that was not found in the dependencies map + alias: DepAlias, + }, + + /// Failed to retrieve dataset from store when loading dataset for table reference + #[error("In table '{table_name}': Failed to retrieve dataset '{reference}'")] + GetDataset { + /// The table being processed when the error occurred + table_name: TableName, + /// The hash reference of the dataset that failed to load + reference: HashReference, + #[source] + source: BoxError, + }, + + /// Table not found in dataset + #[error( + "In table '{table_name}': Table '{referenced_table_name}' not found in dataset '{reference}'" + )] + TableNotFoundInDataset { + /// The table being processed when the error occurred + table_name: TableName, + /// The name of the table that was not found in the dataset + referenced_table_name: TableName, + /// The hash reference of the dataset where the table was not found + reference: HashReference, + }, +} + +#[derive(Debug, thiserror::Error)] +pub enum ResolveUdfsError { + /// Dependency alias not found when processing function reference + #[error( + "In table '{table_name}': Dependency alias '{alias}' referenced in function but not provided in dependencies" + )] + DependencyAliasNotFound { + /// The table being processed when the error occurred + table_name: TableName, + /// The dependency alias that was not found in the dependencies map + alias: DepAlias, + }, + + /// Failed to retrieve dataset from store when loading dataset for function + #[error("In table '{table_name}': Failed to retrieve dataset '{reference}' for function")] + GetDataset { + /// The table being processed when the error occurred + table_name: TableName, + /// The hash reference of the dataset that failed to load + reference: HashReference, + #[source] + source: BoxError, + }, + + /// Failed to create ETH call UDF for dataset referenced in function name + #[error( + "In table '{table_name}': Failed to create ETH call UDF for dataset '{reference}' for function" + )] + EthCallUdfCreation { + /// The table being processed when the error occurred + table_name: TableName, + /// The hash reference of the dataset for which the eth_call UDF creation failed + reference: HashReference, + #[source] + source: BoxError, + }, + + /// eth_call function not available for dataset + #[error("In table '{table_name}': Function 'eth_call' not available for dataset '{reference}'")] + EthCallNotAvailable { + /// The table being processed when the error occurred + table_name: TableName, + /// The hash reference of the dataset that does not support eth_call + reference: HashReference, + }, + + /// Function not found in dataset + #[error( + "In table '{table_name}': Function '{function_name}' not found in dataset '{reference}'" + )] + FunctionNotFoundInDataset { + /// The table being processed when the error occurred + table_name: TableName, + /// The name of the function that was not found + function_name: FuncName, + /// The hash reference of the dataset where the function was not found + reference: HashReference, + }, + + /// Self-referenced function not found in manifest's functions map. + #[error( + "In table '{table_name}': Self-referenced function '{function_name}' not found in manifest functions" + )] + SelfReferencedFunctionNotFound { + /// The table containing the SQL query with the invalid reference + table_name: TableName, + /// The function name that was referenced but not defined + function_name: FuncName, + }, +} diff --git a/crates/core/common/src/catalog/sql.rs b/crates/core/common/src/catalog/sql.rs deleted file mode 100644 index 2972821a7..000000000 --- a/crates/core/common/src/catalog/sql.rs +++ /dev/null @@ -1,561 +0,0 @@ -//! SQL catalog creation and query planning functions. -//! -//! This module provides functions for building catalogs and planning contexts from SQL queries. -//! Each function serves a specific data path in the Amp architecture, with clear separation -//! between validation, planning, and execution operations. -//! -//! # Function-to-Data-Path Mapping -//! -//! | Function | Schema Endpoint | Manifest Validation | Query Planning | Query Execution | Derived Dataset | Raw Dataset | -//! |-----------------------------------------------------|------------------|---------------------|------------------|------------------|------------------|-------------| -//! | [`planning_ctx_for_sql_tables_with_deps_and_funcs`] | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ | -//! | [`planning_ctx_for_sql`] | ❌ | ❌ | ✅ **EXCLUSIVE** | ❌ | ❌ | ❌ | -//! | [`catalog_for_sql`] | ❌ | ❌ | ❌ | ✅ **EXCLUSIVE** | ❌ | ❌ | -//! | [`catalog_for_sql_with_deps`] | ❌ | ❌ | ❌ | ❌ | ✅ **EXCLUSIVE** | ❌ | -//! | [`get_logical_catalog`] | ❌ | ❌ | ❌ | ✅ (indirect) | ❌ | ❌ | -//! | [`get_logical_catalog_with_deps_and_funcs`] | ❌ | ❌ | ❌ | ❌ | ✅ (indirect) | ❌ | -//! -//! # Data Paths -//! -//! ## 1. Manifest Validation Path -//! -//! - **Purpose**: Validate dataset manifests without data access -//! - **Function**: [`planning_ctx_for_sql_tables_with_deps`] -//! - **Entry Points**: -//! - `POST /schema` endpoint (`crates/services/admin-api/src/handlers/schema.rs`) -//! - `POST /manifests` endpoint via manifest validation (`crates/services/admin-api/src/handlers/manifests/register.rs`) -//! - `POST /datasets` endpoint via manifest validation (`crates/services/admin-api/src/handlers/datasets/register.rs`) -//! - **Characteristics**: Multi-table validation, pre-resolved dependencies, no physical data -//! -//! ## 2. Query Planning Path -//! -//! - **Purpose**: Generate query plans and schemas without execution -//! - **Function**: [`planning_ctx_for_sql`] -//! - **Entry**: Arrow Flight `GetFlightInfo` (`crates/services/server/src/flight.rs`) -//! - **Characteristics**: Fast schema response, logical catalog only, dynamic dataset resolution -//! -//! ## 3. Query Execution Path (Arrow Flight) -//! -//! - **Purpose**: Execute user queries via Arrow Flight -//! - **Function**: [`catalog_for_sql`] (calls [`get_logical_catalog`] internally) -//! - **Entry**: Arrow Flight `DoGet` (`crates/services/server/src/flight.rs`) -//! - **Characteristics**: Full catalog with physical parquet locations, dynamic dataset resolution, streaming results -//! - **Resolution Strategy**: Resolves dataset references to hashes at query time (supports "latest" and version tags) -//! -//! ## 4. Derived Dataset Execution Path -//! -//! - **Purpose**: Execute SQL to create derived datasets during extraction -//! - **Function**: [`catalog_for_sql_with_deps`] (calls [`get_logical_catalog_with_deps_and_funcs`] internally) -//! - **Entry**: Worker-based extraction for SQL datasets (`crates/core/dump/src/core/sql_dump.rs`) -//! - **Characteristics**: Full catalog with physical parquet locations, pre-resolved dependencies, writes parquet files -//! - **Resolution Strategy**: Uses locked dataset hashes from manifest dependencies (deterministic, reproducible) -//! -//! # Key Insights -//! -//! - **Clean separation**: Each public function serves exactly one primary data path -//! - **Dependency handling**: Two parallel execution paths with different resolution strategies: -//! - **Dynamic resolution** (`catalog_for_sql`): For user queries that reference datasets by name/version -//! - **Pre-resolved dependencies** (`catalog_for_sql_with_deps`): For derived datasets with locked dependencies -//! - **Function duplication**: `*_with_deps` variants avoid dual-mode logic and maintain clear boundaries -//! - **Lazy UDF loading**: All functions implement lazy UDF loading for optimal performance -//! - **No raw dataset overlap**: Raw dataset dumps don't use these planning functions -//! - **Two-phase resolution**: Functions without deps use a two-phase pattern: -//! 1. **Resolution phase**: `Reference` → `resolve_revision()` → `HashReference` -//! 2. **Retrieval phase**: `HashReference` → `get_dataset()` → `Dataset` -//! - **Pre-resolved deps**: Functions with deps receive `HashReference` and skip phase 1 - -use std::{ - collections::{BTreeMap, btree_map::Entry}, - sync::Arc, -}; - -use amp_data_store::DataStore; -use datafusion::{logical_expr::ScalarUDF, sql::parser::Statement}; -use datasets_common::{ - func_name::ETH_CALL_FUNCTION_NAME, hash::Hash, partial_reference::PartialReference, - reference::Reference, -}; -use js_runtime::isolate_pool::IsolatePool; - -use super::{ - dataset_access::DatasetAccess, - errors::{CatalogForSqlError, GetLogicalCatalogError, PlanningCtxForSqlError}, - logical::LogicalCatalog, - physical::{Catalog, PhysicalTable}, -}; -use crate::{ - PlanningContext, ResolvedTable, - query_context::QueryEnv, - sql::{ - FunctionReference, TableReference, resolve_function_references, resolve_table_references, - }, -}; -/// Creates a full catalog with physical data access for SQL query execution. -/// -/// This function builds a complete catalog that includes both logical schemas and physical -/// parquet file locations, enabling actual query execution with DataFusion. -/// -/// ## Where Used -/// -/// This function is used exclusively in the **Query Execution Path**: -/// -/// - **Arrow Flight DoGet** (`crates/services/server/src/flight.rs`): -/// - Called during Arrow Flight `DoGet` phase to execute user queries -/// - Provides physical catalog for streaming query results to clients -/// -/// For derived dataset execution, use [`catalog_for_sql_with_deps`] instead. -/// -/// ## Implementation -/// -/// The function: -/// 1. Extracts table references and function names from the query -/// 2. Calls [`get_logical_catalog`] to resolve datasets and build the logical catalog -/// 3. Queries metadata database for physical parquet locations -/// 4. Constructs physical catalog for query execution -pub async fn catalog_for_sql( - dataset_store: &impl DatasetAccess, - data_store: &DataStore, - query: &Statement, - env: QueryEnv, -) -> Result { - let table_refs = resolve_table_references::(query) - .map_err(CatalogForSqlError::TableReferenceResolution)?; - let func_refs = resolve_function_references::(query) - .map_err(CatalogForSqlError::FunctionReferenceResolution)?; - - let logical_catalog = - get_logical_catalog(dataset_store, table_refs, func_refs, &env.isolate_pool) - .await - .map_err(CatalogForSqlError::GetLogicalCatalog)?; - - let mut tables = Vec::new(); - for table in &logical_catalog.tables { - let dataset_ref = table.dataset_reference(); - - let revision = data_store - .get_table_active_revision(dataset_ref, table.name()) - .await - .map_err(|source| CatalogForSqlError::PhysicalTableRetrieval { - dataset: dataset_ref.clone(), - table: table.name().clone(), - source, - })? - .ok_or_else(|| CatalogForSqlError::TableNotSynced { - dataset: dataset_ref.clone(), - table: table.name().clone(), - })?; - - let physical_table = PhysicalTable::from_active_revision( - data_store.clone(), - table.dataset_reference().clone(), - table.dataset_start_block(), - table.table().clone(), - revision, - table.sql_table_ref_schema().to_string(), - ); - tables.push(Arc::new(physical_table)); - } - - Ok(Catalog::new(tables, logical_catalog)) -} - -/// Creates a full catalog with physical data access for SQL query execution with pre-resolved dependencies. -/// -/// This function builds a complete catalog that includes both logical schemas and physical -/// parquet file locations, enabling actual query execution with DataFusion. Unlike -/// `catalog_for_sql`, this function accepts pre-resolved dependencies from a derived dataset -/// manifest, ensuring that SQL schema references map to the exact dataset versions specified -/// in the manifest's dependencies. -/// -/// ## Where Used -/// -/// This function is used exclusively in the **Derived Dataset Execution Path**: -/// -/// - **`crates/core/dump/src/core/sql_dump.rs`**: -/// - Called during worker-based extraction to execute SQL-defined derived datasets -/// - Uses dependencies from the derived dataset manifest -/// - Writes query results as parquet files to object storage -/// -/// ## Implementation -/// -/// The function analyzes the SQL query to: -/// 1. Extract table references and function names from the query -/// 2. Map SQL schema references to pre-resolved dependency aliases -/// 3. Load datasets by their locked hashes from the manifest dependencies -/// 4. Build logical catalog with schemas and UDFs -/// 5. Query metadata database for physical parquet locations -/// 6. Construct physical catalog for query execution -/// -/// ## Dependencies Parameter -/// -/// The `dependencies` parameter is a map from dependency aliases (as used in SQL schema qualifiers) -/// to fully-qualified dataset names and their content hashes. This ensures that: -/// - SQL queries use the exact dataset versions declared in the manifest -/// - No dynamic version resolution occurs during execution -/// - Derived datasets are reproducible and deterministic -/// -/// Creates a planning context for SQL query planning without physical data access. -/// -/// This function builds a logical catalog with schemas only, enabling query plan generation -/// and schema inference without accessing physical parquet files. -/// -/// ## Where Used -/// -/// This function is used exclusively in the **Query Execution Path** for the planning phase: -/// -/// - **Arrow Flight GetFlightInfo** (`crates/services/server/src/flight.rs`): -/// - Called to generate query plan and return schema to clients -/// - Fast response without accessing physical data files -/// - Precedes actual query execution which uses `catalog_for_sql` -/// -/// ## Implementation -/// -/// The function analyzes the SQL query to: -/// 1. Extract table references and function names from the query -/// 2. Resolve dataset names to hashes via the dataset store -/// 3. Build logical catalog with schemas and UDFs -/// 4. Return planning context for DataFusion query planning -/// -/// Unlike `catalog_for_sql`, this does not query the metadata database for physical -/// parquet locations, making it faster for planning-only operations. -pub async fn planning_ctx_for_sql( - store: &impl DatasetAccess, - query: &Statement, -) -> Result { - // Get table and function references from the SQL query - let table_refs = resolve_table_references::(query) - .map_err(PlanningCtxForSqlError::TableReferenceResolution)?; - let func_refs = resolve_function_references::(query) - .map_err(PlanningCtxForSqlError::FunctionReferenceResolution)?; - - // Use hash-based map to deduplicate datasets and collect resolved tables - // Inner map: table_ref -> ResolvedTable (deduplicates table references) - let mut tables: BTreeMap> = BTreeMap::new(); - // Track UDFs separately from datasets - outer key: dataset hash, inner key: function reference - // Inner map ensures deduplication: multiple function references to the same UDF share one instance - let mut udfs: BTreeMap> = BTreeMap::new(); - - // Part 1: Process table references - for table_ref in table_refs { - match &table_ref { - TableReference::Bare { .. } => { - return Err(PlanningCtxForSqlError::UnqualifiedTable { - table_ref: table_ref.to_string(), - }); - } - TableReference::Partial { schema, table } => { - // Schema is already parsed as PartialReference, convert to Reference - let reference: Reference = schema.as_ref().clone().into(); - - // Resolve reference to hash reference - let hash_ref = store - .resolve_revision(&reference) - .await - .map_err(|err| PlanningCtxForSqlError::ResolveDatasetReference { - reference: reference.clone(), - source: err, - })? - .ok_or_else(|| PlanningCtxForSqlError::ResolveDatasetReference { - reference: reference.clone(), - source: format!("Dataset '{}' not found", reference).into(), - })?; - - // Convert table_ref to use String schema for internal data structures - let table_ref_string = - TableReference::partial(schema.to_string(), table.as_ref().clone()); - - // Skip if table reference is already resolved (optimization to avoid redundant dataset loading) - let Entry::Vacant(entry) = tables - .entry(hash_ref.hash().clone()) - .or_default() - .entry(table_ref_string.clone()) - else { - continue; - }; - - // Load dataset by hash reference (cached by store) - let dataset = store.get_dataset(&hash_ref).await.map_err(|err| { - PlanningCtxForSqlError::LoadDataset { - reference: hash_ref.clone(), - source: err, - } - })?; - - // Find table in dataset - let dataset_table = dataset - .tables - .iter() - .find(|t| t.name() == table) - .ok_or_else(|| PlanningCtxForSqlError::TableNotFoundInDataset { - table_name: table.as_ref().clone(), - reference: hash_ref.clone(), - })?; - - // Create ResolvedTable - let resolved_table = ResolvedTable::new( - dataset_table.clone(), - schema.to_string(), - hash_ref.clone(), - dataset.start_block, - ); - - // Insert into vacant entry - entry.insert(resolved_table); - } - } - } - - // Part 2: Process function names (load datasets for UDFs only) - for func_ref in func_refs { - match &func_ref { - FunctionReference::Bare { .. } => continue, // Built-in DataFusion function - FunctionReference::Qualified { schema, function } => { - // Schema is already parsed as PartialReference, convert to Reference - let reference: Reference = schema.as_ref().clone().into(); - - // Resolve reference to hash reference - let hash_ref = store - .resolve_revision(&reference) - .await - .map_err(|err| PlanningCtxForSqlError::ResolveDatasetReference { - reference: reference.clone(), - source: err, - })? - .ok_or_else(|| PlanningCtxForSqlError::ResolveDatasetReference { - reference: reference.clone(), - source: format!("Dataset '{}' not found", reference).into(), - })?; - - // Load dataset by hash reference (cached by store) - let dataset = store.get_dataset(&hash_ref).await.map_err(|err| { - PlanningCtxForSqlError::LoadDataset { - reference: hash_ref.clone(), - source: err, - } - })?; - - // Convert func_ref to use String schema for internal data structures - let func_ref_string = - FunctionReference::qualified(schema.to_string(), function.as_ref().clone()); - - // Skip if function reference is already resolved (optimization to avoid redundant UDF creation) - let Entry::Vacant(entry) = udfs - .entry(hash_ref.hash().clone()) - .or_default() - .entry(func_ref_string) - else { - continue; - }; - - // Get the UDF for this function reference - let udf = if function.as_ref() == ETH_CALL_FUNCTION_NAME { - store - .eth_call_for_dataset(&schema.to_string(), &dataset) - .await - .map_err(|err| PlanningCtxForSqlError::EthCallUdfCreation { - reference: hash_ref.clone(), - source: err, - })? - .ok_or_else(|| PlanningCtxForSqlError::EthCallNotAvailable { - reference: hash_ref.clone(), - })? - } else { - dataset - .function_by_name(schema.to_string(), function, IsolatePool::dummy()) - .ok_or_else(|| PlanningCtxForSqlError::FunctionNotFoundInDataset { - function_name: func_ref.to_string(), - reference: hash_ref, - })? - }; - - entry.insert(udf); - } - } - } - - Ok(PlanningContext::new(LogicalCatalog { - tables: tables - .into_values() - .flat_map(|map| map.into_values()) - .collect(), - udfs: udfs - .into_values() - .flat_map(|map| map.into_values()) - .collect(), - })) -} - -/// Internal helper that builds a logical catalog from table and function references. -/// -/// This function resolves dataset references, loads datasets, and constructs a logical -/// catalog containing resolved tables and UDFs for query planning and execution. -/// -/// ## Where Used -/// -/// Called internally by [`catalog_for_sql`] to build the logical catalog before -/// retrieving physical parquet locations from the metadata database. -async fn get_logical_catalog( - store: &impl DatasetAccess, - table_refs: impl IntoIterator>, - func_refs: impl IntoIterator>, - isolate_pool: &IsolatePool, -) -> Result { - let table_refs = table_refs.into_iter().collect::>(); - let func_refs = func_refs.into_iter().collect::>(); - - // Use hash-based map to deduplicate datasets and collect resolved tables - // Inner map: table_ref -> ResolvedTable (deduplicates table references) - let mut tables: BTreeMap> = BTreeMap::new(); - // Track UDFs separately from datasets - outer key: dataset hash, inner key: function reference - // Inner map ensures deduplication: multiple function references to the same UDF share one instance - let mut udfs: BTreeMap> = BTreeMap::new(); - - // Part 1: Process table references - for table_ref in table_refs { - match &table_ref { - TableReference::Bare { .. } => { - return Err(GetLogicalCatalogError::UnqualifiedTable { - table_ref: table_ref.to_string(), - }); - } - TableReference::Partial { schema, table } => { - // Schema is already parsed as PartialReference, convert to Reference - let reference: Reference = schema.as_ref().clone().into(); - - // Resolve reference to hash reference - let hash_ref = store - .resolve_revision(&reference) - .await - .map_err(|err| GetLogicalCatalogError::ResolveDatasetReference { - reference: reference.clone(), - source: err, - })? - .ok_or_else(|| GetLogicalCatalogError::ResolveDatasetReference { - reference: reference.clone(), - source: format!("Dataset '{}' not found", reference).into(), - })?; - - // Convert table_ref to use String schema for internal data structures - let table_ref_string = - TableReference::partial(schema.to_string(), table.as_ref().clone()); - - // Skip if table reference is already resolved (optimization to avoid redundant dataset loading) - let Entry::Vacant(entry) = tables - .entry(hash_ref.hash().clone()) - .or_default() - .entry(table_ref_string.clone()) - else { - continue; - }; - - // Load dataset by hash reference (cached by store) - let dataset = store.get_dataset(&hash_ref).await.map_err(|err| { - GetLogicalCatalogError::LoadDataset { - reference: hash_ref.clone(), - source: err, - } - })?; - - // Find table in dataset - let dataset_table = dataset - .tables - .iter() - .find(|t| t.name() == table) - .ok_or_else(|| GetLogicalCatalogError::TableNotFoundInDataset { - table_name: table.as_ref().clone(), - reference: hash_ref.clone(), - })?; - - // Create ResolvedTable - let resolved_table = ResolvedTable::new( - dataset_table.clone(), - schema.to_string(), - hash_ref.clone(), - dataset.start_block, - ); - - // Insert into vacant entry - entry.insert(resolved_table); - } - } - } - - // Part 2: Process function names (load datasets for UDFs only) - for func_ref in func_refs { - match &func_ref { - FunctionReference::Bare { .. } => continue, // Built-in DataFusion function - FunctionReference::Qualified { schema, function } => { - // Schema is already parsed as PartialReference, convert to Reference - let reference: Reference = schema.as_ref().clone().into(); - - // Resolve reference to hash reference - let hash_ref = store - .resolve_revision(&reference) - .await - .map_err(|err| GetLogicalCatalogError::ResolveDatasetReference { - reference: reference.clone(), - source: err, - })? - .ok_or_else(|| GetLogicalCatalogError::ResolveDatasetReference { - reference: reference.clone(), - source: format!("Dataset '{}' not found", reference).into(), - })?; - - // Load dataset by hash reference (cached by store) - let dataset = store.get_dataset(&hash_ref).await.map_err(|err| { - GetLogicalCatalogError::LoadDataset { - reference: hash_ref.clone(), - source: err, - } - })?; - - // Convert func_ref to use String schema for internal data structures - let func_ref_string = - FunctionReference::qualified(schema.to_string(), function.as_ref().clone()); - - // Skip if function reference is already resolved (optimization to avoid redundant UDF creation) - let Entry::Vacant(entry) = udfs - .entry(hash_ref.hash().clone()) - .or_default() - .entry(func_ref_string) - else { - continue; - }; - - // Get the UDF for this function reference - let udf = if function.as_ref() == ETH_CALL_FUNCTION_NAME { - store - .eth_call_for_dataset(&schema.to_string(), &dataset) - .await - .map_err(|err| GetLogicalCatalogError::EthCallUdfCreation { - reference: hash_ref.clone(), - source: err, - })? - .ok_or_else(|| GetLogicalCatalogError::EthCallNotAvailable { - reference: hash_ref.clone(), - })? - } else { - dataset - .function_by_name(schema.to_string(), function, isolate_pool.clone()) - .ok_or_else(|| GetLogicalCatalogError::FunctionNotFoundInDataset { - function_name: func_ref.to_string(), - reference: hash_ref, - })? - }; - - entry.insert(udf); - } - } - } - - Ok(LogicalCatalog { - tables: tables - .into_values() - .flat_map(|map| map.into_values()) - .collect(), - udfs: udfs - .into_values() - .flat_map(|map| map.into_values()) - .collect(), - }) -} diff --git a/crates/core/common/src/catalog/validation_derived_dataset.rs b/crates/core/common/src/catalog/validation_derived_dataset.rs new file mode 100644 index 000000000..c53b6452e --- /dev/null +++ b/crates/core/common/src/catalog/validation_derived_dataset.rs @@ -0,0 +1,399 @@ +//! Validation-specific catalog construction for derived dataset manifests +//! +//! This module provides catalog creation for validating derived dataset manifests +//! during the manifest compilation phase. +//! +//! ## Key Functions +//! +//! - [`create`] - Creates a LogicalCatalog for SQL validation +//! - [`resolve_tables`] - Resolves table references using pre-resolved dependencies +//! - [`resolve_udfs`] - Resolves function references to ScalarUDF instances + +use std::{ + collections::{BTreeMap, btree_map::Entry}, + sync::Arc, +}; + +use datafusion::logical_expr::{ScalarUDF, async_udf::AsyncScalarUDF}; +use datasets_common::{ + deps::alias::{DepAlias, DepAliasOrSelfRef, SELF_REF_KEYWORD}, + func_name::{ETH_CALL_FUNCTION_NAME, FuncName}, + hash::Hash, + hash_reference::HashReference, + manifest::Function, + table_name::TableName, +}; +use js_runtime::isolate_pool::IsolatePool; + +use crate::{ + BoxError, + catalog::{ + dataset_access::DatasetAccess, + logical::{LogicalCatalog, ResolvedTable}, + }, + js_udf::JsUdf, + sql::{FunctionReference, TableReference}, +}; + +/// Map of table names to their SQL references (table refs and function refs) using dependency aliases or self-references. +pub type TableReferencesMap = BTreeMap< + TableName, + ( + Vec>, + Vec>, + ), +>; + +/// Creates a logical catalog for SQL validation using pre-resolved dependencies and functions. +/// +/// Builds a unified logical catalog from table and function references across multiple tables, +/// resolving dependency aliases to datasets for schema-only validation (no physical data access). +/// +/// Delegates to specialized helpers: +/// - [`resolve_tables`] - Resolves table references to `ResolvedTable` instances +/// - [`resolve_udfs`] - Resolves function references to UDFs +pub async fn create( + dataset_store: &impl DatasetAccess, + isolate_pool: IsolatePool, + manifest_deps: BTreeMap, + manifest_udfs: BTreeMap, + refs: TableReferencesMap, +) -> Result { + let table_refs: Vec<_> = refs + .iter() + .flat_map(|(name, (table_refs, _))| { + table_refs.iter().map(move |table_ref| (name, table_ref)) + }) + .collect(); + + let tables = resolve_tables(dataset_store, &manifest_deps, table_refs) + .await + .map_err(CreateLogicalCatalogError::ResolveTables)?; + + let func_refs: Vec<_> = refs + .iter() + .flat_map(|(name, (_, func_refs))| func_refs.iter().map(move |func_ref| (name, func_ref))) + .collect(); + + let udfs = resolve_udfs( + dataset_store, + isolate_pool, + &manifest_deps, + &manifest_udfs, + func_refs, + ) + .await + .map_err(CreateLogicalCatalogError::ResolveUdfs)?; + + Ok(LogicalCatalog { tables, udfs }) +} + +#[derive(Debug, thiserror::Error)] +pub enum CreateLogicalCatalogError { + /// Failed to resolve table references to ResolvedTable instances + #[error(transparent)] + ResolveTables(ResolveTablesError), + + /// Failed to resolve function references to UDF instances + #[error(transparent)] + ResolveUdfs(ResolveUdfsError), +} + +/// Resolves table references to ResolvedTable instances using pre-resolved dependencies. +async fn resolve_tables<'a>( + dataset_store: &impl DatasetAccess, + manifest_deps: &BTreeMap, + refs: impl IntoIterator)> + 'a, +) -> Result, ResolveTablesError> { + let mut tables: BTreeMap, ResolvedTable>> = + BTreeMap::new(); + + for (table_name, table_ref) in refs { + match table_ref { + TableReference::Bare { .. } => { + return Err(ResolveTablesError::UnqualifiedTable { + table_name: table_name.clone(), + table_ref: table_ref.to_string(), + }); + } + TableReference::Partial { schema, table } => { + let dataset_ref = manifest_deps.get(schema.as_ref()).ok_or_else(|| { + ResolveTablesError::DependencyAliasNotFound { + table_name: table_name.clone(), + alias: schema.as_ref().clone(), + } + })?; + + let Entry::Vacant(entry) = tables + .entry(dataset_ref.hash().clone()) + .or_default() + .entry(table_ref.clone()) + else { + continue; + }; + + let dataset = dataset_store + .get_dataset(dataset_ref) + .await + .map_err(|err| ResolveTablesError::GetDataset { + table_name: table_name.clone(), + reference: dataset_ref.clone(), + source: err, + })?; + + let dataset_table = dataset + .tables + .iter() + .find(|t| t.name() == table) + .ok_or_else(|| ResolveTablesError::TableNotFoundInDataset { + table_name: table_name.clone(), + referenced_table_name: table.as_ref().clone(), + reference: dataset_ref.clone(), + })?; + + let resolved_table = ResolvedTable::new( + dataset_table.clone(), + schema.to_string(), + dataset_ref.clone(), + dataset.start_block, + ); + + entry.insert(resolved_table); + } + } + } + + Ok(tables + .into_values() + .flat_map(|map| map.into_values()) + .collect()) +} + +/// Resolves function references to ScalarUDF instances using pre-resolved dependencies. +async fn resolve_udfs<'a>( + dataset_store: &impl DatasetAccess, + isolate_pool: IsolatePool, + manifest_deps: &BTreeMap, + manifest_udfs: &BTreeMap, + refs: impl IntoIterator)> + 'a, +) -> Result, ResolveUdfsError> { + let mut udfs: BTreeMap, ScalarUDF>> = + BTreeMap::new(); + let mut self_udfs: BTreeMap, ScalarUDF> = BTreeMap::new(); + + for (table_name, func_ref) in refs { + match func_ref { + FunctionReference::Bare { function: _ } => { + continue; + } + FunctionReference::Qualified { schema, function } => match schema.as_ref() { + DepAliasOrSelfRef::DepAlias(dep_alias) => { + let dataset_ref = manifest_deps.get(dep_alias).ok_or_else(|| { + ResolveUdfsError::DependencyAliasNotFound { + table_name: table_name.clone(), + alias: dep_alias.clone(), + } + })?; + + // Check vacancy BEFORE loading dataset + let Entry::Vacant(entry) = udfs + .entry(dataset_ref.hash().clone()) + .or_default() + .entry(func_ref.clone()) + else { + continue; + }; + + // Only load dataset if UDF not already resolved + let dataset = dataset_store + .get_dataset(dataset_ref) + .await + .map_err(|err| ResolveUdfsError::GetDataset { + table_name: table_name.clone(), + reference: dataset_ref.clone(), + source: err, + })?; + + let udf = if function.as_ref() == ETH_CALL_FUNCTION_NAME { + dataset_store + .eth_call_for_dataset(&schema.to_string(), &dataset) + .await + .map_err(|err| ResolveUdfsError::EthCallUdfCreation { + table_name: table_name.clone(), + reference: dataset_ref.clone(), + source: err, + })? + .ok_or_else(|| ResolveUdfsError::EthCallNotAvailable { + table_name: table_name.clone(), + reference: dataset_ref.clone(), + })? + } else { + dataset + .function_by_name(schema.to_string(), function, IsolatePool::dummy()) + .ok_or_else(|| ResolveUdfsError::FunctionNotFoundInDataset { + table_name: table_name.clone(), + function_name: (**function).clone(), + reference: dataset_ref.clone(), + })? + }; + + entry.insert(udf); + } + DepAliasOrSelfRef::SelfRef => { + let func_def = manifest_udfs.get(function).ok_or_else(|| { + ResolveUdfsError::SelfReferencedFunctionNotFound { + table_name: table_name.clone(), + function_name: (**function).clone(), + } + })?; + + let Entry::Vacant(entry) = self_udfs.entry(func_ref.clone()) else { + continue; + }; + + let udf = AsyncScalarUDF::new(Arc::new(JsUdf::new( + isolate_pool.clone(), + Some(SELF_REF_KEYWORD.to_string()), + func_def.source.source.clone(), + func_def.source.filename.clone().into(), + Arc::from(function.as_ref().as_str()), + func_def + .input_types + .iter() + .map(|dt| dt.clone().into_arrow()) + .collect(), + func_def.output_type.clone().into_arrow(), + ))) + .into_scalar_udf(); + + entry.insert(udf); + } + }, + } + } + + Ok(self_udfs + .into_values() + .chain(udfs.into_values().flat_map(|map| map.into_values())) + .collect()) +} + +#[derive(Debug, thiserror::Error)] +pub enum ResolveTablesError { + /// Table is not qualified with a schema/dataset name + #[error( + "In table '{table_name}': Unqualified table '{table_ref}', all tables must be qualified with a dataset" + )] + UnqualifiedTable { + /// The table being processed when the error occurred + table_name: TableName, + /// The unqualified table reference string + table_ref: String, + }, + + /// Dependency alias not found when processing table reference + #[error( + "In table '{table_name}': Dependency alias '{alias}' referenced in table but not provided in dependencies" + )] + DependencyAliasNotFound { + /// The table being processed when the error occurred + table_name: TableName, + /// The dependency alias that was not found in the dependencies map + alias: DepAlias, + }, + + /// Failed to retrieve dataset from store when loading dataset for table reference + #[error("In table '{table_name}': Failed to retrieve dataset '{reference}'")] + GetDataset { + /// The table being processed when the error occurred + table_name: TableName, + /// The hash reference of the dataset that failed to load + reference: HashReference, + #[source] + source: BoxError, + }, + + /// Table not found in dataset + #[error( + "In table '{table_name}': Table '{referenced_table_name}' not found in dataset '{reference}'" + )] + TableNotFoundInDataset { + /// The table being processed when the error occurred + table_name: TableName, + /// The name of the table that was not found in the dataset + referenced_table_name: TableName, + /// The hash reference of the dataset where the table was not found + reference: HashReference, + }, +} + +#[derive(Debug, thiserror::Error)] +pub enum ResolveUdfsError { + /// Dependency alias not found when processing function reference + #[error( + "In table '{table_name}': Dependency alias '{alias}' referenced in function but not provided in dependencies" + )] + DependencyAliasNotFound { + /// The table being processed when the error occurred + table_name: TableName, + /// The dependency alias that was not found in the dependencies map + alias: DepAlias, + }, + + /// Failed to retrieve dataset from store when loading dataset for function + #[error("In table '{table_name}': Failed to retrieve dataset '{reference}' for function")] + GetDataset { + /// The table being processed when the error occurred + table_name: TableName, + /// The hash reference of the dataset that failed to load + reference: HashReference, + #[source] + source: BoxError, + }, + + /// Failed to create ETH call UDF for dataset referenced in function name + #[error( + "In table '{table_name}': Failed to create ETH call UDF for dataset '{reference}' for function" + )] + EthCallUdfCreation { + /// The table being processed when the error occurred + table_name: TableName, + /// The hash reference of the dataset for which the eth_call UDF creation failed + reference: HashReference, + #[source] + source: BoxError, + }, + + /// eth_call function not available for dataset + #[error("In table '{table_name}': Function 'eth_call' not available for dataset '{reference}'")] + EthCallNotAvailable { + /// The table being processed when the error occurred + table_name: TableName, + /// The hash reference of the dataset that does not support eth_call + reference: HashReference, + }, + + /// Function not found in dataset + #[error( + "In table '{table_name}': Function '{function_name}' not found in dataset '{reference}'" + )] + FunctionNotFoundInDataset { + /// The table being processed when the error occurred + table_name: TableName, + /// The name of the function that was not found + function_name: FuncName, + /// The hash reference of the dataset where the function was not found + reference: HashReference, + }, + + /// Self-referenced function not found in manifest's functions map. + #[error( + "In table '{table_name}': Self-referenced function '{function_name}' not found in manifest functions" + )] + SelfReferencedFunctionNotFound { + /// The table containing the SQL query with the invalid reference + table_name: TableName, + /// The function name that was referenced but not defined + function_name: FuncName, + }, +} diff --git a/crates/core/datasets-common/src/manifest.rs b/crates/core/datasets-common/src/manifest.rs index d7197d1d9..e992ba2dd 100644 --- a/crates/core/datasets-common/src/manifest.rs +++ b/crates/core/datasets-common/src/manifest.rs @@ -176,3 +176,29 @@ pub struct Field { /// Whether the field can contain null values pub nullable: bool, } + +/// User-defined function specification. +/// +/// Defines a custom function with input/output types and implementation source. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[serde(rename_all = "camelCase")] +pub struct Function { + // TODO: Support SQL type names, see https://datafusion.apache.org/user-guide/sql/data_types.html + /// Arrow data types for function input parameters + pub input_types: Vec, + /// Arrow data type for function return value + pub output_type: DataType, + /// Function implementation source code and metadata + pub source: FunctionSource, +} + +/// Source code and metadata for a user-defined function. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +pub struct FunctionSource { + /// Function implementation source code + pub source: Arc, + /// Filename where the function is defined + pub filename: String, +} diff --git a/crates/core/datasets-derived/src/catalog.rs b/crates/core/datasets-derived/src/catalog.rs deleted file mode 100644 index 5eda6c555..000000000 --- a/crates/core/datasets-derived/src/catalog.rs +++ /dev/null @@ -1,902 +0,0 @@ -//! Catalog functions for derived datasets -//! -//! This module contains catalog creation functions that handle derived dataset dependencies. -//! These functions work with `DepAlias`, `DepReference`, and `FuncName` types that are specific -//! to derived datasets. -//! -//! ## SQL Catalog Functions -//! -//! SQL catalog functions provide catalog creation functions that work with pre-resolved -//! dataset dependencies (DepAlias → Hash mappings) for deterministic, reproducible -//! derived dataset execution. - -use std::{ - collections::{BTreeMap, btree_map::Entry}, - sync::Arc, -}; - -use amp_data_store::DataStore; -use common::{ - BoxError, PlanningContext, ResolvedTable, - catalog::{ - dataset_access::DatasetAccess, - logical::LogicalCatalog, - physical::{Catalog, PhysicalTable}, - }, - js_udf::JsUdf, - query_context::QueryEnv, - sql::{ - FunctionReference, ResolveFunctionReferencesError, ResolveTableReferencesError, - TableReference, resolve_function_references, resolve_table_references, - }, -}; -use datafusion::{ - logical_expr::{ScalarUDF, async_udf::AsyncScalarUDF}, - sql::parser::Statement, -}; -use datasets_common::{ - deps::alias::{DepAlias, DepAliasError, DepAliasOrSelfRef, DepAliasOrSelfRefError}, - func_name::{ETH_CALL_FUNCTION_NAME, FuncName}, - hash::Hash, - hash_reference::HashReference, - table_name::TableName, -}; -use js_runtime::isolate_pool::IsolatePool; - -use crate::manifest::Function; - -pub async fn catalog_for_sql_with_deps( - store: &impl DatasetAccess, - data_store: &DataStore, - query: &Statement, - env: &QueryEnv, - dependencies: &BTreeMap, - functions: &BTreeMap, -) -> Result { - let table_refs = resolve_table_references::(query) - .map_err(CatalogForSqlWithDepsError::TableReferenceResolution)?; - let func_refs = resolve_function_references::(query) - .map_err(CatalogForSqlWithDepsError::FunctionReferenceResolution)?; - - get_physical_catalog_with_deps( - store, - data_store, - table_refs, - func_refs, - env, - dependencies, - functions, - ) - .await - .map_err(CatalogForSqlWithDepsError::GetPhysicalCatalogWithDeps) -} - -async fn get_physical_catalog_with_deps( - dataset_store: &impl DatasetAccess, - data_store: &DataStore, - table_refs: impl IntoIterator>, - function_refs: impl IntoIterator>, - env: &QueryEnv, - dependencies: &BTreeMap, - functions: &BTreeMap, -) -> Result { - let logical_catalog = get_logical_catalog_with_deps_and_funcs( - dataset_store, - table_refs, - function_refs, - &env.isolate_pool, - dependencies, - functions, - ) - .await - .map_err(GetPhysicalCatalogWithDepsError::GetLogicalCatalogWithDepsAndFuncs)?; - - let mut tables = Vec::new(); - for table in &logical_catalog.tables { - let dataset_ref = table.dataset_reference(); - let table_name = table.name(); - - let revision = data_store - .get_table_active_revision(dataset_ref, table_name) - .await - .map_err( - |err| GetPhysicalCatalogWithDepsError::PhysicalTableRetrieval { - dataset: dataset_ref.clone(), - table: table_name.clone(), - source: err, - }, - )? - .ok_or(GetPhysicalCatalogWithDepsError::TableNotSynced { - dataset: dataset_ref.clone(), - table: table_name.clone(), - })?; - - let physical_table = PhysicalTable::from_active_revision( - data_store.clone(), - table.dataset_reference().clone(), - table.dataset_start_block(), - table.table().clone(), - revision, - table.sql_table_ref_schema().to_string(), - ); - tables.push(physical_table.into()); - } - Ok(Catalog::new(tables, logical_catalog)) -} - -/// Internal helper that builds a logical catalog from table references and function names. -/// -/// This function resolves dataset references, loads dataset metadata, and creates UDFs -/// for the referenced datasets. It builds the logical layer of the catalog without -/// accessing physical data locations. -/// -/// ## Where Used -/// -/// Called internally by: -/// - `get_physical_catalog` (which is called by `catalog_for_sql`) -async fn get_logical_catalog_with_deps_and_funcs( - store: &impl DatasetAccess, - table_refs: impl IntoIterator>, - func_refs: impl IntoIterator>, - isolate_pool: &IsolatePool, - dependencies: &BTreeMap, - functions: &BTreeMap, -) -> Result { - let table_refs = table_refs.into_iter().collect::>(); - let func_refs = func_refs.into_iter().collect::>(); - - // Use hash-based map to deduplicate datasets and collect resolved tables - // Inner map: table_ref -> ResolvedTable (deduplicates table references) - let mut tables: BTreeMap> = BTreeMap::new(); - // Track UDFs from external dependencies - outer key: dataset hash, inner key: function reference - // Inner map ensures deduplication: multiple function references to the same UDF share one instance - let mut udfs: BTreeMap> = BTreeMap::new(); - // Track UDFs defined in this manifest (bare functions and self-references) - separate from dependency functions - // Ensures deduplication: multiple references to the same function share one instance - let mut self_udfs: BTreeMap, ScalarUDF> = BTreeMap::new(); - - // Part 1: Process table references - for table_ref in table_refs { - match &table_ref { - TableReference::Bare { .. } => { - return Err(GetLogicalCatalogWithDepsAndFuncsError::UnqualifiedTable { - table_ref: table_ref.to_string(), - }); - } - TableReference::Partial { schema, table } => { - // Schema is already parsed as DepAlias, lookup in dependencies map - let hash_ref = dependencies.get(schema.as_ref()).ok_or_else(|| { - GetLogicalCatalogWithDepsAndFuncsError::DependencyAliasNotFoundForTableRef { - alias: schema.as_ref().clone(), - } - })?; - - // Convert table_ref to use String schema for internal data structures - let table_ref_string = - TableReference::partial(schema.to_string(), table.as_ref().clone()); - - // Skip if table reference is already resolved (optimization to avoid redundant dataset loading) - let Entry::Vacant(entry) = tables - .entry(hash_ref.hash().clone()) - .or_default() - .entry(table_ref_string.clone()) - else { - continue; - }; - - // Load dataset by hash (cached by store) - let dataset = store.get_dataset(hash_ref).await.map_err(|err| { - GetLogicalCatalogWithDepsAndFuncsError::GetDatasetForTableRef { - reference: hash_ref.clone(), - source: err, - } - })?; - - // Find table in dataset - let dataset_table = dataset - .tables - .iter() - .find(|t| t.name() == table) - .ok_or_else(|| { - GetLogicalCatalogWithDepsAndFuncsError::TableNotFoundInDataset { - table_name: table.as_ref().clone(), - reference: hash_ref.clone(), - } - })?; - - // Create ResolvedTable - let resolved_table = ResolvedTable::new( - dataset_table.clone(), - schema.to_string(), - hash_ref.clone(), - dataset.start_block, - ); - - // Insert into vacant entry - entry.insert(resolved_table); - } - } - } - - // Part 2: Process function references (load datasets for qualified UDFs, create UDFs for bare functions) - for func_ref in func_refs { - match &func_ref { - // Skip bare functions - they are assumed to be built-in functions (Amp or DataFusion) - FunctionReference::Bare { function: _ } => continue, - FunctionReference::Qualified { schema, function } => { - // Match on schema type: DepAlias (external dependency) or SelfRef (same-dataset function) - match schema.as_ref() { - DepAliasOrSelfRef::DepAlias(dep_alias) => { - // External dependency reference - lookup in dependencies map - let hash_ref = dependencies.get(dep_alias).ok_or_else(|| { - GetLogicalCatalogWithDepsAndFuncsError::DependencyAliasNotFoundForFunctionRef { - alias: dep_alias.clone(), - } - })?; - - // Load dataset by hash (cached by store) - let dataset = store.get_dataset(hash_ref).await.map_err(|err| { - GetLogicalCatalogWithDepsAndFuncsError::GetDatasetForFunction { - reference: hash_ref.clone(), - source: err, - } - })?; - - // Convert func_ref to use String schema for internal data structures - let func_ref_string = FunctionReference::qualified( - schema.to_string(), - function.as_ref().clone(), - ); - - // Skip if function reference is already resolved (optimization to avoid redundant UDF creation) - let Entry::Vacant(entry) = udfs - .entry(hash_ref.hash().clone()) - .or_default() - .entry(func_ref_string) - else { - continue; - }; - - // Get the UDF for this function reference - let udf = if function.as_ref() == ETH_CALL_FUNCTION_NAME { - store - .eth_call_for_dataset(&schema.to_string(), &dataset) - .await - .map_err(|err| { - GetLogicalCatalogWithDepsAndFuncsError::EthCallUdfCreationForFunction { - reference: hash_ref.clone(), - source: err, - } - })? - .ok_or_else(|| GetLogicalCatalogWithDepsAndFuncsError::EthCallNotAvailable { - reference: hash_ref.clone(), - })? - } else { - dataset - .function_by_name(schema.to_string(), function, isolate_pool.clone()) - .ok_or_else(|| { - GetLogicalCatalogWithDepsAndFuncsError::FunctionNotFoundInDataset { - function_name: func_ref.to_string(), - reference: hash_ref.clone(), - } - })? - }; - - entry.insert(udf); - } - DepAliasOrSelfRef::SelfRef => { - // Same-dataset function reference (self.function_name) - // Look up function in the functions map (defined in this dataset) - if let Some(func_def) = functions.get(function) { - // Skip if function reference is already resolved (optimization) - let Entry::Vacant(entry) = self_udfs.entry(func_ref.clone()) else { - continue; - }; - - // Create UDF from Function definition using JsUdf - // Use "self" as schema qualifier to preserve case sensitivity - let udf = AsyncScalarUDF::new(Arc::new(JsUdf::new( - isolate_pool.clone(), - Some(datasets_common::deps::alias::SELF_REF_KEYWORD.to_string()), // Schema = "self" - func_def.source.source.clone(), - func_def.source.filename.clone().into(), - Arc::from(function.as_ref().as_str()), - func_def - .input_types - .iter() - .map(|dt| dt.clone().into_arrow()) - .collect(), - func_def.output_type.clone().into_arrow(), - ))) - .into_scalar_udf(); - - entry.insert(udf); - } - // If function not in functions map, it's an error (self.function should always be defined) - // TODO: Add proper error variant for this case - } - } - } - } - } - - Ok(LogicalCatalog { - tables: tables - .into_values() - .flat_map(|map| map.into_values()) - .collect(), - udfs: self_udfs - .into_values() - .chain(udfs.into_values().flat_map(|map| map.into_values())) - .collect(), - }) -} - -/// Type alias for the table references map used in multi-table validation -/// -/// Maps table names to their SQL references (table refs and function refs) using dependency aliases or self-references. -type TableReferencesMap = BTreeMap< - TableName, - ( - Vec>, - Vec>, - ), ->; - -/// Creates a planning context for multi-table schema validation with pre-resolved dependencies. -/// -/// This function validates dataset manifests by building logical catalogs for multiple -/// tables simultaneously, using dependencies that have been pre-resolved with aliases -/// by the caller. -/// -/// ## Where Used -/// -/// This function is used in three manifest validation paths: -/// -/// 1. **Schema Endpoint** (`crates/services/admin-api/src/handlers/schema.rs`): -/// - Called via `POST /schema` endpoint from TypeScript CLI (`amp register`) -/// - Validates SQL in dataset manifests during interactive schema generation -/// - Returns schemas for manifest generation without accessing physical data -/// -/// 2. **Manifest Registration** (`crates/services/admin-api/src/handlers/manifests/register.rs`): -/// - Called via `POST /manifests` endpoint during content-addressable manifest registration -/// - Validates derived dataset manifests via `datasets_derived::validate()` -/// - Ensures all SQL queries, dependencies, and table references are valid -/// - Stores validated manifests in content-addressable storage without dataset linking -/// -/// 3. **Dataset Registration** (`crates/services/admin-api/src/handlers/datasets/register.rs`): -/// - Called via `POST /datasets` endpoint during dataset registration -/// - Validates derived dataset manifests via `datasets_derived::validate()` -/// - Ensures all SQL queries, dependencies, and table references are valid -/// - Prevents invalid manifests from being registered and linked to dataset versions -/// -/// ## Implementation -/// -/// Unlike `planning_ctx_for_sql`, this function: -/// 1. Accepts pre-resolved dependencies with aliases from the API request -/// 2. Processes multiple tables simultaneously (batch validation) -/// 3. Maps table references to user-provided dependency aliases -/// 4. Builds a unified logical catalog for all tables -/// 5. Returns planning context for schema validation only -/// -/// This function does not access physical parquet files or the metadata database, -/// making it suitable for fast manifest validation during dataset registration. -/// -/// ## Function Handling -/// -/// Bare (unqualified) function references are handled as follows: -/// - If the function is defined in the `functions` parameter, a UDF is created for it -/// - If the function is not defined, it's assumed to be a built-in function (logged as debug) -/// - TODO: Add validation against DataFusion built-in functions to catch typos -pub async fn planning_ctx_for_sql_tables_with_deps_and_funcs( - store: &impl DatasetAccess, - references: TableReferencesMap, - dependencies: BTreeMap, - functions: BTreeMap, - isolate_pool: IsolatePool, -) -> Result { - // Use hash-based map to deduplicate datasets across ALL tables - // Inner map: table_ref -> ResolvedTable (deduplicates table references) - let mut tables: BTreeMap> = BTreeMap::new(); - // Track UDFs from external dependencies - outer key: dataset hash, inner key: function reference - // Inner map ensures deduplication: multiple function references to the same UDF share one instance - let mut udfs: BTreeMap> = BTreeMap::new(); - // Track UDFs defined in this manifest (bare functions and self-references) - separate from dependency functions - // Ensures deduplication: multiple references to the same function share one instance - let mut self_udfs: BTreeMap, ScalarUDF> = BTreeMap::new(); - - // Process all tables - fail fast on first error - for (table_name, (table_refs, func_refs)) in references { - // Part 1: Process table references for this table - for table_ref in table_refs { - match &table_ref { - TableReference::Bare { .. } => { - return Err(PlanningCtxForSqlTablesWithDepsError::UnqualifiedTable { - table_name: table_name.clone(), - table_ref: table_ref.to_string(), - }); - } - TableReference::Partial { schema, table } => { - // Schema is already parsed as DepAlias, lookup in dependencies map - let hash_ref = dependencies.get(schema.as_ref()).ok_or_else(|| { - PlanningCtxForSqlTablesWithDepsError::DependencyAliasNotFoundForTableRef { - table_name: table_name.clone(), - alias: schema.as_ref().clone(), - } - })?; - - // Skip if table reference is already resolved (optimization to avoid redundant dataset loading) - let Entry::Vacant(entry) = tables - .entry(hash_ref.hash().clone()) - .or_default() - .entry(table_ref.to_string_reference()) - else { - continue; - }; - - // Load dataset by hash (cached by store) - let dataset = store.get_dataset(hash_ref).await.map_err(|err| { - PlanningCtxForSqlTablesWithDepsError::GetDatasetForTableRef { - table_name: table_name.clone(), - reference: hash_ref.clone(), - source: err, - } - })?; - - // Find table in dataset - let dataset_table = dataset - .tables - .iter() - .find(|t| t.name() == table) - .ok_or_else(|| { - PlanningCtxForSqlTablesWithDepsError::TableNotFoundInDataset { - table_name: table_name.clone(), - referenced_table_name: table.as_ref().clone(), - reference: hash_ref.clone(), - } - })?; - - // Create ResolvedTable - let resolved_table = ResolvedTable::new( - dataset_table.clone(), - schema.to_string(), - hash_ref.clone(), - dataset.start_block, - ); - - // Insert into vacant entry - entry.insert(resolved_table); - } - } - } - - // Part 2: Process function references for this table (load datasets for qualified UDFs only) - for func_ref in func_refs { - match &func_ref { - // Skip bare functions - they are assumed to be built-in functions (Amp or DataFusion) - FunctionReference::Bare { function: _ } => { - continue; - } - FunctionReference::Qualified { schema, function } => { - // Match on schema type: DepAlias (external dependency) or SelfRef (same-dataset function) - match schema.as_ref() { - DepAliasOrSelfRef::DepAlias(dep_alias) => { - // External dependency reference - lookup in dependencies map - let hash_ref = dependencies.get(dep_alias).ok_or_else(|| { - PlanningCtxForSqlTablesWithDepsError::DependencyAliasNotFoundForFunctionRef { - table_name: table_name.clone(), - alias: dep_alias.clone(), - } - })?; - - // Load dataset by hash (cached by store) - let dataset = store.get_dataset(hash_ref).await.map_err(|err| { - PlanningCtxForSqlTablesWithDepsError::GetDatasetForFunction { - table_name: table_name.clone(), - reference: hash_ref.clone(), - source: err, - } - })?; - - // Skip if function reference is already resolved (optimization to avoid redundant UDF creation) - let Entry::Vacant(entry) = udfs - .entry(hash_ref.hash().clone()) - .or_default() - .entry(func_ref.to_string_reference()) - else { - continue; - }; - - // Get the UDF for this function reference - let udf = if function.as_ref() == ETH_CALL_FUNCTION_NAME { - store - .eth_call_for_dataset(&schema.to_string(), &dataset) - .await - .map_err(|err| { - PlanningCtxForSqlTablesWithDepsError::EthCallUdfCreationForFunction { - table_name: table_name.clone(), - reference: hash_ref.clone(), - source: err, - } - })? - .ok_or_else(|| { - PlanningCtxForSqlTablesWithDepsError::EthCallNotAvailable { - table_name: table_name.clone(), - reference: hash_ref.clone(), - } - })? - } else { - dataset - .function_by_name(schema.to_string(), function, IsolatePool::dummy()) - .ok_or_else(|| { - PlanningCtxForSqlTablesWithDepsError::FunctionNotFoundInDataset { - table_name: table_name.clone(), - function_name: func_ref.to_string(), - reference: hash_ref.clone(), - } - })? - }; - - entry.insert(udf); - } - DepAliasOrSelfRef::SelfRef => { - // Same-dataset function reference (self.function_name) - // Look up function in the functions map (defined in this dataset) - if let Some(func_def) = functions.get(function) { - // Skip if function reference is already resolved (optimization) - let Entry::Vacant(entry) = self_udfs.entry(func_ref.clone()) else { - continue; - }; - - // Create UDF from Function definition using JsUdf - // Use "self" as schema qualifier to preserve case sensitivity - let udf = AsyncScalarUDF::new(Arc::new(JsUdf::new( - isolate_pool.clone(), - Some( - datasets_common::deps::alias::SELF_REF_KEYWORD.to_string(), - ), // Schema = "self" - func_def.source.source.clone(), - func_def.source.filename.clone().into(), - Arc::from(function.as_ref().as_str()), - func_def - .input_types - .iter() - .map(|dt| dt.clone().into_arrow()) - .collect(), - func_def.output_type.clone().into_arrow(), - ))) - .into_scalar_udf(); - - entry.insert(udf); - } else { - // Function not in functions map - this is an error for self-references - tracing::error!( - table=%table_name, - function=%function.as_ref(), - "Self-referenced function not defined in functions map" - ); - // TODO: Add proper error variant for this case - } - } - } - } - } - } - } - - // Flatten to Vec and create single unified planning context - // Extract values from nested BTreeMap structure - Ok(PlanningContext::new(LogicalCatalog { - tables: tables - .into_values() - .flat_map(|map| map.into_values()) - .collect(), - udfs: self_udfs - .into_values() - .chain(udfs.into_values().flat_map(|map| map.into_values())) - .collect(), - })) -} - -// ================================================================================================ -// Error Types -// ================================================================================================ -// -// Error types for derived dataset catalog operations. -// These error types were moved from `common::catalog::errors` to break the circular -// dependency between `common` and `datasets-derived`. - -#[derive(Debug, thiserror::Error)] -pub enum PlanningCtxForSqlTablesWithDepsError { - /// Table is not qualified with a schema/dataset name. - /// - /// All tables must be qualified with a dataset reference in the schema portion. - /// Unqualified tables (e.g., just `table_name`) are not allowed. - #[error( - "In table '{table_name}': Unqualified table '{table_ref}', all tables must be qualified with a dataset" - )] - UnqualifiedTable { - table_name: TableName, - table_ref: String, - }, - - /// Failed to retrieve dataset from store when loading dataset for table reference. - /// - /// This occurs when loading a dataset definition fails: - /// - Dataset not found in the store - /// - Dataset manifest is invalid or corrupted - /// - Unsupported dataset kind - /// - Storage backend errors when reading the dataset - #[error("In table '{table_name}': Failed to retrieve dataset '{reference}'")] - GetDatasetForTableRef { - table_name: TableName, - reference: HashReference, - #[source] - source: BoxError, - }, - - /// Dependency alias not found when processing table reference. - /// - /// This occurs when a table reference uses an alias that was not provided - /// in the dependencies map. - #[error( - "In table '{table_name}': Dependency alias '{alias}' referenced in table but not provided in dependencies" - )] - DependencyAliasNotFoundForTableRef { - table_name: TableName, - alias: DepAlias, - }, - - /// Dependency alias not found when processing function reference. - /// - /// This occurs when a function reference uses an alias that was not provided - /// in the dependencies map. - #[error( - "In table '{table_name}': Dependency alias '{alias}' referenced in function but not provided in dependencies" - )] - DependencyAliasNotFoundForFunctionRef { - table_name: TableName, - alias: DepAlias, - }, - - /// Failed to retrieve dataset from store when loading dataset for function. - /// - /// This occurs when loading a dataset definition for a function fails: - /// - Dataset not found in the store - /// - Dataset manifest is invalid or corrupted - /// - Unsupported dataset kind - /// - Storage backend errors when reading the dataset - #[error("In table '{table_name}': Failed to retrieve dataset '{reference}' for function")] - GetDatasetForFunction { - table_name: TableName, - reference: HashReference, - #[source] - source: BoxError, - }, - - /// Failed to create ETH call UDF for dataset referenced in function name. - /// - /// This occurs when creating the eth_call user-defined function for a function fails: - /// - Invalid provider configuration for the dataset - /// - Provider connection issues - /// - Dataset is not an EVM RPC dataset but eth_call was requested - #[error( - "In table '{table_name}': Failed to create ETH call UDF for dataset '{reference}' for function" - )] - EthCallUdfCreationForFunction { - table_name: TableName, - reference: HashReference, - #[source] - source: BoxError, - }, - - /// Function not found in dataset. - /// - /// This occurs when a function is referenced in the SQL query but the - /// dataset does not contain a function with that name. - #[error( - "In table '{table_name}': Function '{function_name}' not found in dataset '{reference}'" - )] - FunctionNotFoundInDataset { - table_name: TableName, - function_name: String, - reference: HashReference, - }, - - /// eth_call function not available for dataset. - /// - /// This occurs when the eth_call function is referenced in SQL but the - /// dataset does not support eth_call (not an EVM RPC dataset or no provider configured). - #[error("In table '{table_name}': Function 'eth_call' not available for dataset '{reference}'")] - EthCallNotAvailable { - table_name: TableName, - reference: HashReference, - }, - - /// Table not found in dataset. - /// - /// This occurs when the table name is referenced in the SQL query but the - /// dataset does not contain a table with that name. - #[error( - "In table '{table_name}': Table '{referenced_table_name}' not found in dataset '{reference}'" - )] - TableNotFoundInDataset { - table_name: TableName, - referenced_table_name: TableName, - reference: HashReference, - }, -} - -/// Errors specific to planning_ctx_for_sql operations -/// -/// This error type is used exclusively by `planning_ctx_for_sql()` to create -/// a planning context for SQL queries without requiring physical data to exist. - -#[derive(Debug, thiserror::Error)] -pub enum CatalogForSqlWithDepsError { - /// Failed to resolve table references from the SQL statement. - /// - /// This occurs when: - /// - Table references contain invalid identifiers - /// - Table references have unsupported format (not 1-3 parts) - /// - Table names don't conform to identifier rules - /// - Schema portion fails to parse as DepAlias - #[error("Failed to resolve table references from SQL")] - TableReferenceResolution(#[source] ResolveTableReferencesError), - - /// Failed to extract function names from the SQL statement. - /// - /// This occurs when: - /// - The SQL statement contains DML operations (CreateExternalTable, CopyTo) - /// - An EXPLAIN statement wraps an unsupported statement type - /// - Schema portion fails to parse as DepAlias or self reference - #[error("Failed to resolve function references from SQL")] - FunctionReferenceResolution(#[source] ResolveFunctionReferencesError), - - /// Failed to get the physical catalog for the resolved tables and functions. - /// - /// This wraps errors from `get_physical_catalog_with_deps`, which can occur when: - /// - Dataset retrieval fails - /// - Physical table metadata cannot be retrieved - /// - Tables have not been synced - /// - Dependency aliases are invalid or not found - #[error("Failed to get physical catalog with dependencies: {0}")] - GetPhysicalCatalogWithDeps(#[source] GetPhysicalCatalogWithDepsError), -} - -/// Errors specific to get_physical_catalog_with_deps operations - -#[derive(Debug, thiserror::Error)] -pub enum GetPhysicalCatalogWithDepsError { - /// Failed to get the logical catalog with dependencies and functions. - /// - /// This wraps errors from `get_logical_catalog_with_deps_and_funcs`, which can occur when: - /// - Dataset names cannot be extracted from table references or function names - /// - Dataset retrieval fails - /// - UDF creation fails - /// - Dependency aliases are invalid or not found - #[error("Failed to get logical catalog with dependencies and functions: {0}")] - GetLogicalCatalogWithDepsAndFuncs(#[source] GetLogicalCatalogWithDepsAndFuncsError), - - /// Failed to retrieve physical table metadata from the metadata database. - /// - /// This occurs when querying the metadata database for the active physical - /// location of a table fails due to database connection issues, query errors, - /// or other database-related problems. - #[error("Failed to retrieve physical table metadata for table {dataset}.{table}")] - PhysicalTableRetrieval { - dataset: HashReference, - table: TableName, - #[source] - source: amp_data_store::GetTableActiveRevisionError, - }, - - /// Table has not been synced and no physical location exists. - /// - /// This occurs when attempting to load a physical catalog for a table that - /// has been defined but has not yet been dumped/synced to storage. The table - /// exists in the dataset definition but has no physical parquet files. - #[error("Table {dataset}.{table} has not been synced")] - TableNotSynced { - dataset: HashReference, - table: TableName, - }, -} - -/// Errors specific to get_logical_catalog_with_deps_and_funcs operations -#[derive(Debug, thiserror::Error)] -#[allow(clippy::large_enum_variant)] -pub enum GetLogicalCatalogWithDepsAndFuncsError { - /// Table is not qualified with a schema/dataset name. - /// - /// All tables must be qualified with a dataset reference in the schema portion. - /// Unqualified tables (e.g., just `table_name`) are not allowed. - #[error("Unqualified table '{table_ref}', all tables must be qualified with a dataset")] - UnqualifiedTable { table_ref: String }, - - /// Dependency alias not found when processing table reference. - /// - /// This occurs when a table reference uses an alias that was not provided - /// in the dependencies map. - #[error( - "Dependency alias '{alias}' referenced in table reference but not provided in dependencies" - )] - DependencyAliasNotFoundForTableRef { alias: DepAlias }, - - /// Failed to retrieve dataset from store when loading dataset for table reference. - /// - /// This occurs when loading a dataset definition fails: - /// - Dataset not found in the store - /// - Dataset manifest is invalid or corrupted - /// - Unsupported dataset kind - /// - Storage backend errors when reading the dataset - #[error("Failed to retrieve dataset '{reference}' for table reference")] - GetDatasetForTableRef { - reference: HashReference, - #[source] - source: BoxError, - }, - - /// Dependency alias not found when processing function reference. - /// - /// This occurs when a function reference uses an alias that was not provided - /// in the dependencies map. - #[error( - "Dependency alias '{alias}' referenced in function reference but not provided in dependencies" - )] - DependencyAliasNotFoundForFunctionRef { alias: DepAlias }, - - /// Failed to retrieve dataset from store when loading dataset for function. - /// - /// This occurs when loading a dataset definition for a function fails: - /// - Dataset not found in the store - /// - Dataset manifest is invalid or corrupted - /// - Unsupported dataset kind - /// - Storage backend errors when reading the dataset - #[error("Failed to retrieve dataset '{reference}' for function reference")] - GetDatasetForFunction { - reference: HashReference, - #[source] - source: BoxError, - }, - - /// Failed to create ETH call UDF for dataset referenced in function name. - /// - /// This occurs when creating the eth_call user-defined function for a function fails: - /// - Invalid provider configuration for the dataset - /// - Provider connection issues - /// - Dataset is not an EVM RPC dataset but eth_call was requested - #[error("Failed to create ETH call UDF for dataset '{reference}' for function reference")] - EthCallUdfCreationForFunction { - reference: HashReference, - #[source] - source: BoxError, - }, - - /// eth_call function not available for dataset. - /// - /// This occurs when the eth_call function is referenced in SQL but the - /// dataset does not support eth_call (not an EVM RPC dataset or no provider configured). - #[error("Function 'eth_call' not available for dataset '{reference}'")] - EthCallNotAvailable { reference: HashReference }, - - /// Function not found in dataset. - /// - /// This occurs when a function is referenced in the SQL query but the - /// dataset does not contain a function with that name. - #[error("Function '{function_name}' not found in dataset '{reference}'")] - FunctionNotFoundInDataset { - function_name: String, - reference: HashReference, - }, - - /// Table not found in dataset. - /// - /// This occurs when the table name is referenced in the SQL query but the - /// dataset does not contain a table with that name. - #[error("Table '{table_name}' not found in dataset '{reference}'")] - TableNotFoundInDataset { - table_name: TableName, - reference: HashReference, - }, -} diff --git a/crates/core/datasets-derived/src/lib.rs b/crates/core/datasets-derived/src/lib.rs index 719e7420b..0c577ff69 100644 --- a/crates/core/datasets-derived/src/lib.rs +++ b/crates/core/datasets-derived/src/lib.rs @@ -12,7 +12,6 @@ //! //! See [`manifest::Manifest`] for the complete derived dataset specification. -pub mod catalog; mod dataset_kind; pub mod logical; pub mod manifest; diff --git a/crates/core/datasets-derived/src/logical.rs b/crates/core/datasets-derived/src/logical.rs index 83e0f3fb8..1e032f24f 100644 --- a/crates/core/datasets-derived/src/logical.rs +++ b/crates/core/datasets-derived/src/logical.rs @@ -18,11 +18,16 @@ use common::{ catalog::{ dataset_access::DatasetAccess, logical::{Function as LogicalFunction, FunctionSource as LogicalFunctionSource}, + validation_derived_dataset::{ + self as catalog, CreateLogicalCatalogError, ResolveTablesError, ResolveUdfsError, + TableReferencesMap, + }, }, + planning_context::PlanningContext, query_context::Error as QueryContextErr, sql::{ - FunctionReference, ResolveFunctionReferencesError, ResolveTableReferencesError, - TableReference, resolve_function_references, resolve_table_references, + ResolveFunctionReferencesError, ResolveTableReferencesError, TableReference, + resolve_function_references, resolve_table_references, }, utils::dfs, }; @@ -36,9 +41,6 @@ use js_runtime::isolate_pool::IsolatePool; use crate::{ DerivedDatasetKind, Manifest, - catalog::{ - PlanningCtxForSqlTablesWithDepsError, planning_ctx_for_sql_tables_with_deps_and_funcs, - }, manifest::{TableInput, View}, }; @@ -281,17 +283,6 @@ pub struct TableDependencySortError { pub table_name: TableName, } -/// Type alias for the table references map used in multi-table validation -/// -/// Maps table names to their SQL references (table refs and function refs) using dependency aliases or self-references. -type TableReferencesMap = BTreeMap< - TableName, - ( - Vec>, - Vec>, - ), ->; - /// Validates a derived dataset manifest with comprehensive checks. /// /// This function performs deep validation of a manifest by: @@ -329,10 +320,9 @@ pub async fn validate( reference: reference.to_string(), source: err, })? - .ok_or_else(|| ManifestValidationError::DependencyResolution { + .ok_or_else(|| ManifestValidationError::DependencyNotFound { alias: alias.to_string(), reference: reference.to_string(), - source: format!("Dependency '{}' not found", reference).into(), })?; dependencies.insert(alias.clone(), reference); @@ -340,8 +330,8 @@ pub async fn validate( // Check if the start block is before the earliest available block of the dependencies if let Some(dataset_start_block) = &manifest.start_block { - for (alias, hash_ref) in &dependencies { - let dataset = store.get_dataset(hash_ref).await.map_err(|err| { + for (alias, dataset_ref) in &dependencies { + let dataset = store.get_dataset(dataset_ref).await.map_err(|err| { ManifestValidationError::FetchDependencyDataset { alias: alias.to_string(), source: err, @@ -407,45 +397,49 @@ pub async fn validate( // - Bare function references can be created as UDFs or are assumed to be built-ins // - Table references use valid dataset aliases from dependencies // - Schema compatibility across dependencies - let planning_ctx = planning_ctx_for_sql_tables_with_deps_and_funcs( + let planning_ctx = catalog::create( store, - references, + IsolatePool::dummy(), // For manifest validation only (no JS execution) dependencies, manifest.functions.clone(), - IsolatePool::dummy(), // For manifest validation only (no JS execution) + references, ) .await + .map(PlanningContext::new) .map_err(|err| match &err { - PlanningCtxForSqlTablesWithDepsError::UnqualifiedTable { .. } => { - ManifestValidationError::UnqualifiedTable(err) - } - PlanningCtxForSqlTablesWithDepsError::GetDatasetForTableRef { .. } => { - ManifestValidationError::GetDataset(err) - } - PlanningCtxForSqlTablesWithDepsError::GetDatasetForFunction { .. } => { - ManifestValidationError::GetDataset(err) - } - PlanningCtxForSqlTablesWithDepsError::EthCallUdfCreationForFunction { .. } => { - ManifestValidationError::EthCallUdfCreation(err) - } - PlanningCtxForSqlTablesWithDepsError::DependencyAliasNotFoundForTableRef { .. } => { - ManifestValidationError::DependencyAliasNotFound(err) - } - PlanningCtxForSqlTablesWithDepsError::DependencyAliasNotFoundForFunctionRef { .. } => { - ManifestValidationError::DependencyAliasNotFound(err) - } - PlanningCtxForSqlTablesWithDepsError::TableNotFoundInDataset { .. } => { - ManifestValidationError::TableNotFoundInDataset(err) - } - PlanningCtxForSqlTablesWithDepsError::FunctionNotFoundInDataset { .. } => { - ManifestValidationError::FunctionNotFoundInDataset(err) - } - PlanningCtxForSqlTablesWithDepsError::EthCallNotAvailable { .. } => { - ManifestValidationError::EthCallNotAvailable(err) - } + CreateLogicalCatalogError::ResolveTables(resolve_error) => match resolve_error { + ResolveTablesError::UnqualifiedTable { .. } => { + ManifestValidationError::UnqualifiedTable(err) + } + ResolveTablesError::DependencyAliasNotFound { .. } => { + ManifestValidationError::DependencyAliasNotFound(err) + } + ResolveTablesError::GetDataset { .. } => ManifestValidationError::GetDataset(err), + ResolveTablesError::TableNotFoundInDataset { .. } => { + ManifestValidationError::TableNotFoundInDataset(err) + } + }, + CreateLogicalCatalogError::ResolveUdfs(resolve_error) => match resolve_error { + ResolveUdfsError::DependencyAliasNotFound { .. } => { + ManifestValidationError::DependencyAliasNotFound(err) + } + ResolveUdfsError::GetDataset { .. } => ManifestValidationError::GetDataset(err), + ResolveUdfsError::EthCallUdfCreation { .. } => { + ManifestValidationError::EthCallUdfCreation(err) + } + ResolveUdfsError::EthCallNotAvailable { .. } => { + ManifestValidationError::EthCallNotAvailable(err) + } + ResolveUdfsError::FunctionNotFoundInDataset { .. } => { + ManifestValidationError::FunctionNotFoundInDataset(err) + } + ResolveUdfsError::SelfReferencedFunctionNotFound { .. } => { + ManifestValidationError::FunctionNotFoundInDataset(err) + } + }, })?; - // Step 4: Validate that all table SQL queries are incremental + // Step 4: Validate that all table SQL queries are incremental. // Incremental processing is required for derived datasets to efficiently update // as new blocks arrive. This check ensures no non-incremental operations are used. for (table_name, table) in &manifest.tables { @@ -489,6 +483,7 @@ pub enum ManifestValidationError { /// - Query parsing fails for other reasons #[error("Invalid SQL query for table '{table_name}': {source}")] InvalidTableSql { + /// The table whose SQL query is invalid table_name: TableName, #[source] source: common::sql::ParseSqlError, @@ -497,6 +492,7 @@ pub enum ManifestValidationError { /// Failed to resolve table references from SQL query #[error("Failed to resolve table references in table '{table_name}': {source}")] TableReferenceResolution { + /// The table whose SQL query contains unresolvable table references table_name: TableName, #[source] source: ResolveTableReferencesError, @@ -505,6 +501,7 @@ pub enum ManifestValidationError { /// Failed to resolve function references from SQL query #[error("Failed to resolve function references in table '{table_name}': {source}")] FunctionReferenceResolution { + /// The table whose SQL query contains unresolvable function references table_name: TableName, #[source] source: ResolveFunctionReferencesError, @@ -512,18 +509,30 @@ pub enum ManifestValidationError { /// Failed to resolve dependency reference to hash /// - /// This occurs when resolving a dependency reference fails: + /// This occurs when resolving a dependency reference fails due to: /// - Invalid reference format - /// - Dependency not found in the dataset store /// - Storage backend errors when reading the dependency #[error("Failed to resolve dependency '{alias}' ({reference}): {source}")] DependencyResolution { + /// The dependency alias used in the manifest alias: String, + /// The dataset reference string (e.g., "dataset@version" or "dataset@hash") reference: String, #[source] source: BoxError, }, + /// Dependency dataset not found + /// + /// This occurs when the dependency reference does not exist in the dataset store. + #[error("Dependency '{alias}' not found ({reference})")] + DependencyNotFound { + /// The dependency alias used in the manifest + alias: String, + /// The dataset reference string (e.g., "dataset@version" or "dataset@hash") + reference: String, + }, + /// Failed to fetch dependency dataset for start_block validation /// /// This occurs when fetching the dataset definition for a dependency fails during @@ -531,6 +540,7 @@ pub enum ManifestValidationError { /// loading the actual dataset from the store failed. #[error("Failed to fetch dependency '{alias}' for start_block validation: {source}")] FetchDependencyDataset { + /// The dependency alias that failed to load alias: String, #[source] source: BoxError, @@ -543,6 +553,7 @@ pub enum ManifestValidationError { /// This error occurs during SQL parsing when a 3-part table reference is detected. #[error("Catalog-qualified table reference in table '{table_name}': {source}")] CatalogQualifiedTableInSql { + /// The table whose SQL query contains a catalog-qualified table reference table_name: TableName, #[source] source: ResolveTableReferencesError, @@ -553,7 +564,7 @@ pub enum ManifestValidationError { /// All tables must be qualified with a dataset reference in the schema portion. /// Unqualified tables (e.g., just `table_name`) are not allowed. #[error("Unqualified table reference: {0}")] - UnqualifiedTable(#[source] PlanningCtxForSqlTablesWithDepsError), + UnqualifiedTable(#[source] CreateLogicalCatalogError), /// Invalid table name /// @@ -566,7 +577,7 @@ pub enum ManifestValidationError { /// /// The referenced dataset does not exist in the store. #[error("Dataset not found: {0}")] - DatasetNotFound(#[source] PlanningCtxForSqlTablesWithDepsError), + DatasetNotFound(#[source] CreateLogicalCatalogError), /// Failed to retrieve dataset from store /// @@ -575,44 +586,44 @@ pub enum ManifestValidationError { /// - Unsupported dataset kind /// - Storage backend errors #[error("Failed to retrieve dataset from store: {0}")] - GetDataset(#[source] PlanningCtxForSqlTablesWithDepsError), + GetDataset(#[source] CreateLogicalCatalogError), /// Failed to create ETH call UDF /// /// This occurs when creating the eth_call user-defined function fails. #[error("Failed to create ETH call UDF: {0}")] - EthCallUdfCreation(#[source] PlanningCtxForSqlTablesWithDepsError), + EthCallUdfCreation(#[source] CreateLogicalCatalogError), /// Table not found in dataset /// /// The referenced table does not exist in the dataset. #[error("Table not found in dataset: {0}")] - TableNotFoundInDataset(#[source] PlanningCtxForSqlTablesWithDepsError), + TableNotFoundInDataset(#[source] CreateLogicalCatalogError), /// Function not found in dataset /// /// The referenced function does not exist in the dataset. #[error("Function not found in dataset: {0}")] - FunctionNotFoundInDataset(#[source] PlanningCtxForSqlTablesWithDepsError), + FunctionNotFoundInDataset(#[source] CreateLogicalCatalogError), /// eth_call function not available /// /// The eth_call function is not available for the referenced dataset. #[error("eth_call function not available: {0}")] - EthCallNotAvailable(#[source] PlanningCtxForSqlTablesWithDepsError), + EthCallNotAvailable(#[source] CreateLogicalCatalogError), /// Invalid dependency alias /// /// The dependency alias does not conform to alias rules (must start with letter, /// contain only alphanumeric/underscore, and be <= 63 bytes). #[error("Invalid dependency alias: {0}")] - InvalidDependencyAlias(#[source] PlanningCtxForSqlTablesWithDepsError), + InvalidDependencyAlias(#[source] CreateLogicalCatalogError), /// Dependency alias not found /// /// A table reference uses an alias that was not provided in the dependencies map. #[error("Dependency alias not found: {0}")] - DependencyAliasNotFound(#[source] PlanningCtxForSqlTablesWithDepsError), + DependencyAliasNotFound(#[source] CreateLogicalCatalogError), /// Non-incremental SQL operation in table query /// @@ -640,6 +651,7 @@ pub enum ManifestValidationError { /// - Ensure queries only use incremental operations (projection, filter, inner join, union) #[error("Table '{table_name}' contains non-incremental SQL: {source}")] NonIncrementalSql { + /// The table whose SQL query contains non-incremental operations table_name: TableName, #[source] source: BoxError, @@ -655,6 +667,7 @@ pub enum ManifestValidationError { /// - Unsupported SQL features #[error("Failed to plan query for table '{table_name}': {source}")] SqlPlanningError { + /// The table whose SQL query failed to plan table_name: TableName, #[source] source: QueryContextErr, @@ -668,7 +681,9 @@ pub enum ManifestValidationError { "derived dataset start_block ({dataset_start_block}) is before dependency's earliest available block ({dependency_earliest_block})" )] StartBlockBeforeDependencies { + /// The start block of the derived dataset dataset_start_block: BlockNum, + /// The earliest available block of the dependency dataset dependency_earliest_block: BlockNum, }, } diff --git a/crates/core/datasets-derived/src/manifest.rs b/crates/core/datasets-derived/src/manifest.rs index b3a4adaa2..7bcece83b 100644 --- a/crates/core/datasets-derived/src/manifest.rs +++ b/crates/core/datasets-derived/src/manifest.rs @@ -4,15 +4,14 @@ //! Derived datasets replace the legacy SQL dataset format, providing versioned, dependency-aware dataset //! definitions with explicit schemas and functions. -use std::{collections::BTreeMap, sync::Arc}; +use std::collections::BTreeMap; use common::{BlockNum, sql_str::SqlStr}; // Re-export schema types from datasets-common -pub use datasets_common::manifest::{ArrowSchema, Field, TableSchema}; +pub use datasets_common::manifest::{ArrowSchema, Field, Function, FunctionSource, TableSchema}; use datasets_common::{ deps::{alias::DepAlias, reference::DepReference}, func_name::FuncName, - manifest::DataType, table_name::TableName, }; @@ -59,32 +58,6 @@ pub struct Table { pub network: String, } -/// User-defined function specification. -/// -/// Defines a custom function with input/output types and implementation source. -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] -#[serde(rename_all = "camelCase")] -pub struct Function { - // TODO: Support SQL type names, see https://datafusion.apache.org/user-guide/sql/data_types.html - /// Arrow data types for function input parameters - pub input_types: Vec, - /// Arrow data type for function return value - pub output_type: DataType, - /// Function implementation source code and metadata - pub source: FunctionSource, -} - -/// Source code and metadata for a user-defined function. -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] -pub struct FunctionSource { - /// Function implementation source code - pub source: Arc, - /// Filename where the function is defined - pub filename: String, -} - /// Input source for a table definition. /// /// Currently only SQL views are supported as table inputs. diff --git a/crates/core/dump/Cargo.toml b/crates/core/dump/Cargo.toml index 662366d7b..6327417c0 100644 --- a/crates/core/dump/Cargo.toml +++ b/crates/core/dump/Cargo.toml @@ -5,24 +5,24 @@ version.workspace = true license-file.workspace = true [dependencies] +alloy.workspace = true amp-data-store = { path = "../data-store" } +amp-dataset-store = { path = "../dataset-store" } async-stream.workspace = true -metadata-db = { path = "../metadata-db" } common = { path = "../common" } datafusion.workspace = true -amp-dataset-store = { path = "../dataset-store" } -datasets-derived = { path = "../datasets-derived" } datasets-common = { path = "../datasets-common" } +datasets-derived = { path = "../datasets-derived" } futures.workspace = true -tracing.workspace = true +metadata-db = { path = "../metadata-db" } +monitoring = { path = "../monitoring" } object_store.workspace = true +parking_lot = "0.12.5" +serde.workspace = true +serde_json.workspace = true thiserror.workspace = true tokio.workspace = true tokio-stream.workspace = true -url.workspace = true -serde_json.workspace = true -serde.workspace = true tokio-util.workspace = true -alloy.workspace = true -monitoring = { path = "../monitoring" } -parking_lot = "0.12.5" +tracing.workspace = true +url.workspace = true diff --git a/crates/core/dump/src/derived_dataset.rs b/crates/core/dump/src/derived_dataset.rs index 387d21196..2ae254d41 100644 --- a/crates/core/dump/src/derived_dataset.rs +++ b/crates/core/dump/src/derived_dataset.rs @@ -99,14 +99,19 @@ use std::{collections::BTreeMap, sync::Arc, time::Instant}; use amp_data_store::file_name::FileName; use common::{ BlockNum, BoxError, DetachedLogicalPlan, PlanningContext, QueryContext, - catalog::physical::{Catalog, PhysicalTable}, + catalog::{ + dump_derived_dataset as catalog, + physical::{Catalog, PhysicalTable}, + }, metadata::{Generation, segments::ResumeWatermark}, query_context::QueryEnv, + sql::{resolve_function_references, resolve_table_references}, }; -use datasets_common::{deps::alias::DepAlias, hash_reference::HashReference}; -use datasets_derived::{ - Manifest as DerivedManifest, catalog::catalog_for_sql_with_deps, manifest::TableInput, +use datasets_common::{ + deps::alias::{DepAlias, DepAliasOrSelfRef}, + hash_reference::HashReference, }; +use datasets_derived::{Manifest as DerivedManifest, manifest::TableInput}; use futures::StreamExt as _; use metadata_db::NotificationMultiplexerHandle; use tracing::{Instrument, instrument}; @@ -464,15 +469,19 @@ async fn dump_table( let mut join_set = tasks::FailFastJoinSet::>::new(); - let catalog = catalog_for_sql_with_deps( - &ctx.dataset_store, - &ctx.data_store, - &query, - &env, - &dependencies, - &manifest.functions, - ) - .await?; + let catalog = { + let table_refs = resolve_table_references::(&query)?; + let func_refs = resolve_function_references::(&query)?; + catalog::create_phy( + &ctx.dataset_store, + &ctx.data_store, + &env.isolate_pool, + &dependencies, + &manifest.functions, + (table_refs, func_refs), + ) + .await? + }; let planning_ctx = PlanningContext::new(catalog.logical().clone()); let manifest_start_block = manifest.start_block; diff --git a/crates/services/admin-api/src/handlers/schema.rs b/crates/services/admin-api/src/handlers/schema.rs index f78341ffd..bdd029bb7 100644 --- a/crates/services/admin-api/src/handlers/schema.rs +++ b/crates/services/admin-api/src/handlers/schema.rs @@ -8,11 +8,15 @@ use axum::{ }; use common::{ BoxError, + catalog::schema_inference::{ + self, CreateLogicalCatalogError, ResolveTablesError, ResolveUdfsError, TableReferencesMap, + }, plan_visitors::prepend_special_block_num_field, + planning_context::PlanningContext, query_context::Error as QueryContextError, sql::{ - FunctionReference, ResolveFunctionReferencesError, ResolveTableReferencesError, - TableReference, resolve_function_references, resolve_table_references, + ResolveFunctionReferencesError, ResolveTableReferencesError, resolve_function_references, + resolve_table_references, }, sql_str::SqlStr, }; @@ -26,12 +30,7 @@ use datasets_common::{ hash_reference::HashReference, table_name::TableName, }; -use datasets_derived::{ - catalog::{ - PlanningCtxForSqlTablesWithDepsError, planning_ctx_for_sql_tables_with_deps_and_funcs, - }, - manifest::{Function, TableSchema}, -}; +use datasets_derived::manifest::{Function, TableSchema}; use js_runtime::isolate_pool::IsolatePool; use tracing::instrument; @@ -40,15 +39,6 @@ use crate::{ handlers::error::{ErrorResponse, IntoErrorResponse}, }; -/// Type alias for table references map with dependency aliases or self-references -type TableReferencesMap = BTreeMap< - TableName, - ( - Vec>, - Vec>, - ), ->; - /// Handler for the `POST /schema` endpoint /// /// Analyzes SQL queries and returns the output schema without executing the query. @@ -267,67 +257,67 @@ pub async fn handler( (statements, references) }; - // Create planning context using resolved dependencies - let planning_ctx = planning_ctx_for_sql_tables_with_deps_and_funcs( + // Create logical catalog using resolved dependencies + let catalog = schema_inference::create( &ctx.dataset_store, - references, + IsolatePool::dummy(), // For schema validation only (no JS execution) dependencies, functions, - IsolatePool::dummy(), // For schema validation only (no JS execution) + references, ) .await .map_err(|err| match &err { - PlanningCtxForSqlTablesWithDepsError::UnqualifiedTable { .. } => { - Error::UnqualifiedTable(err) - } - PlanningCtxForSqlTablesWithDepsError::GetDatasetForTableRef { source, .. } => { - // NOTE: The source error is a BoxError from DatasetAccess::get_dataset(). - // When the underlying error is GetDatasetError::DatasetNotFound, we map to - // Error::DatasetNotFound to return HTTP 404. All other GetDatasetError variants - // (and other error types) map to Error::GetDataset for HTTP 500. - if source - .downcast_ref::() - .is_some_and(|e| matches!(e, GetDatasetError::DatasetNotFound(_))) - { - Error::DatasetNotFound(err) - } else { - Error::GetDataset(err) + CreateLogicalCatalogError::ResolveTables(inner) => match inner { + ResolveTablesError::UnqualifiedTable { .. } => Error::UnqualifiedTable(err), + ResolveTablesError::DependencyAliasNotFound { .. } => { + Error::DependencyAliasNotFound(err) } - } - PlanningCtxForSqlTablesWithDepsError::GetDatasetForFunction { source, .. } => { - // NOTE: The source error is a BoxError from DatasetAccess::get_dataset(). - // When the underlying error is GetDatasetError::DatasetNotFound, we map to - // Error::DatasetNotFound to return HTTP 404. All other GetDatasetError variants - // (and other error types) map to Error::GetDataset for HTTP 500. - if source - .downcast_ref::() - .is_some_and(|e| matches!(e, GetDatasetError::DatasetNotFound(_))) - { - Error::DatasetNotFound(err) - } else { - Error::GetDataset(err) + ResolveTablesError::GetDataset { source, .. } => { + // NOTE: The source error is a BoxError from DatasetAccess::get_dataset(). + // When the underlying error is GetDatasetError::DatasetNotFound, we map to + // Error::DatasetNotFound to return HTTP 404. All other GetDatasetError variants + // (and other error types) map to Error::GetDataset for HTTP 500. + if source + .downcast_ref::() + .is_some_and(|e| matches!(e, GetDatasetError::DatasetNotFound(_))) + { + Error::DatasetNotFound(err) + } else { + Error::GetDataset(err) + } } - } - PlanningCtxForSqlTablesWithDepsError::EthCallUdfCreationForFunction { .. } => { - Error::EthCallUdfCreation(err) - } - PlanningCtxForSqlTablesWithDepsError::DependencyAliasNotFoundForTableRef { .. } => { - Error::DependencyAliasNotFound(err) - } - PlanningCtxForSqlTablesWithDepsError::DependencyAliasNotFoundForFunctionRef { .. } => { - Error::DependencyAliasNotFound(err) - } - PlanningCtxForSqlTablesWithDepsError::TableNotFoundInDataset { .. } => { - Error::TableNotFoundInDataset(err) - } - PlanningCtxForSqlTablesWithDepsError::FunctionNotFoundInDataset { .. } => { - Error::FunctionNotFoundInDataset(err) - } - PlanningCtxForSqlTablesWithDepsError::EthCallNotAvailable { .. } => { - Error::EthCallNotAvailable(err) - } + ResolveTablesError::TableNotFoundInDataset { .. } => Error::TableNotFoundInDataset(err), + }, + CreateLogicalCatalogError::ResolveUdfs(inner) => match inner { + ResolveUdfsError::DependencyAliasNotFound { .. } => Error::DependencyAliasNotFound(err), + ResolveUdfsError::GetDataset { source, .. } => { + // NOTE: The source error is a BoxError from DatasetAccess::get_dataset(). + // When the underlying error is GetDatasetError::DatasetNotFound, we map to + // Error::DatasetNotFound to return HTTP 404. All other GetDatasetError variants + // (and other error types) map to Error::GetDataset for HTTP 500. + if source + .downcast_ref::() + .is_some_and(|e| matches!(e, GetDatasetError::DatasetNotFound(_))) + { + Error::DatasetNotFound(err) + } else { + Error::GetDataset(err) + } + } + ResolveUdfsError::EthCallUdfCreation { .. } => Error::EthCallUdfCreation(err), + ResolveUdfsError::EthCallNotAvailable { .. } => Error::EthCallNotAvailable(err), + ResolveUdfsError::FunctionNotFoundInDataset { .. } => { + Error::FunctionNotFoundInDataset(err) + } + ResolveUdfsError::SelfReferencedFunctionNotFound { .. } => { + Error::FunctionNotFoundInDataset(err) + } + }, })?; + // Create planning context from catalog + let planning_ctx = PlanningContext::new(catalog); + // Infer schema for each table and extract networks let mut schemas = BTreeMap::new(); for (table_name, stmt) in statements { @@ -560,7 +550,7 @@ enum Error { /// All tables must be qualified with a dataset reference in the schema portion. /// Unqualified tables (e.g., just `table_name`) are not allowed. #[error(transparent)] - UnqualifiedTable(PlanningCtxForSqlTablesWithDepsError), + UnqualifiedTable(CreateLogicalCatalogError), /// Invalid table name /// @@ -575,7 +565,7 @@ enum Error { /// /// The referenced dataset does not exist in the store. #[error(transparent)] - DatasetNotFound(PlanningCtxForSqlTablesWithDepsError), + DatasetNotFound(CreateLogicalCatalogError), /// Failed to retrieve dataset from store /// @@ -584,31 +574,31 @@ enum Error { /// - Unsupported dataset kind /// - Storage backend errors #[error(transparent)] - GetDataset(PlanningCtxForSqlTablesWithDepsError), + GetDataset(CreateLogicalCatalogError), /// Failed to create ETH call UDF /// /// This occurs when creating the eth_call user-defined function fails. #[error(transparent)] - EthCallUdfCreation(PlanningCtxForSqlTablesWithDepsError), + EthCallUdfCreation(CreateLogicalCatalogError), /// Table not found in dataset /// /// The referenced table does not exist in the dataset. #[error(transparent)] - TableNotFoundInDataset(PlanningCtxForSqlTablesWithDepsError), + TableNotFoundInDataset(CreateLogicalCatalogError), /// Function not found in dataset /// /// The referenced function does not exist in the dataset. #[error(transparent)] - FunctionNotFoundInDataset(PlanningCtxForSqlTablesWithDepsError), + FunctionNotFoundInDataset(CreateLogicalCatalogError), /// eth_call function not available /// /// The eth_call function is not available for the referenced dataset. #[error(transparent)] - EthCallNotAvailable(PlanningCtxForSqlTablesWithDepsError), + EthCallNotAvailable(CreateLogicalCatalogError), /// Invalid dependency alias in table reference /// @@ -661,7 +651,7 @@ enum Error { /// /// A table or function reference uses an alias that was not provided in the dependencies map. #[error(transparent)] - DependencyAliasNotFound(PlanningCtxForSqlTablesWithDepsError), + DependencyAliasNotFound(CreateLogicalCatalogError), /// Failed to infer schema for table /// diff --git a/crates/services/server/Cargo.toml b/crates/services/server/Cargo.toml index 02a4a42e2..533535ae6 100644 --- a/crates/services/server/Cargo.toml +++ b/crates/services/server/Cargo.toml @@ -14,9 +14,11 @@ bincode.workspace = true bytes.workspace = true common = { path = "../../core/common" } datafusion.workspace = true +datasets-common = { path = "../../core/datasets-common" } amp-dataset-store = { path = "../../core/dataset-store" } dump = { path = "../../core/dump" } futures.workspace = true +js-runtime = { path = "../../core/js-runtime" } metadata-db = { path = "../../core/metadata-db" } monitoring = { path = "../../core/monitoring" } prost.workspace = true diff --git a/crates/services/server/src/flight.rs b/crates/services/server/src/flight.rs index 1ad463857..ffcaaba82 100644 --- a/crates/services/server/src/flight.rs +++ b/crates/services/server/src/flight.rs @@ -29,23 +29,28 @@ use common::{ ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}, }, catalog::{ - errors::{CatalogForSqlError, PlanningCtxForSqlError}, physical::Catalog, - sql::{catalog_for_sql, planning_ctx_for_sql}, + query_server::{CatalogForSqlError, CreateLogicalCatalogError, create_logical, create_phy}, }, metadata::segments::{BlockRange, ResumeWatermark}, query_context::{Error as CoreError, QueryEnv}, + sql::{ + ResolveFunctionReferencesError, ResolveTableReferencesError, resolve_function_references, + resolve_table_references, + }, sql_str::SqlStr, utils::error_with_causes, }; use datafusion::{ common::DFSchema, error::DataFusionError, physical_plan::stream::RecordBatchStreamAdapter, }; +use datasets_common::partial_reference::{PartialReference, PartialReferenceError}; use dump::streaming_query::{QueryMessage, StreamingQuery}; use futures::{ Stream, StreamExt as _, TryStreamExt, stream::{self, BoxStream}, }; +use js_runtime::isolate_pool::IsolatePool; use metadata_db::{MetadataDb, NotificationMultiplexerHandle, notification_multiplexer}; use monitoring::telemetry::metrics::Meter; use prost::Message as _; @@ -102,14 +107,20 @@ impl Service { ) -> Result { let query = common::sql::parse(sql.as_ref()) .map_err(|err| Error::CoreError(CoreError::SqlParseError(err)))?; - let catalog = catalog_for_sql( - &self.dataset_store, - &self.data_store, - &query, - self.env.clone(), - ) - .await - .map_err(Error::CatalogForSqlError)?; + let catalog = { + let table_refs = resolve_table_references::(&query) + .map_err(Error::TableReferenceResolution)?; + let func_refs = resolve_function_references::(&query) + .map_err(Error::FunctionReferenceResolution)?; + create_phy( + &self.dataset_store, + &self.data_store, + &self.env.isolate_pool, + (table_refs, func_refs), + ) + .await + .map_err(Error::CatalogForSqlError) + }?; let ctx = PlanningContext::new(catalog.logical().clone()); let plan = ctx @@ -282,11 +293,25 @@ impl Service { .query .parse::() .map_err(|err| Error::InvalidQuery(err.to_string()))?; + let query = common::sql::parse(&sql_str) .map_err(|err| Error::CoreError(CoreError::SqlParseError(err)))?; - let plan_ctx = planning_ctx_for_sql(&self.dataset_store, &query) + let plan_ctx = { + let table_refs = resolve_table_references::(&query) + .map_err(Error::TableReferenceResolution)?; + let func_refs = resolve_function_references::(&query) + .map_err(Error::FunctionReferenceResolution)?; + + create_logical( + &self.dataset_store, + &IsolatePool::dummy(), + (table_refs, func_refs), + ) .await - .map_err(Error::PlanningCtxForSqlError)?; + .map(PlanningContext::new) + .map_err(Error::CreateLogicalCatalogError) + }?; + let is_streaming = streaming_override .unwrap_or_else(|| common::stream_helpers::is_streaming(&query)); let schema = plan_ctx @@ -915,8 +940,14 @@ pub enum Error { #[error("error loading catalog for SQL")] CatalogForSqlError(#[source] CatalogForSqlError), - #[error("error creating planning context")] - PlanningCtxForSqlError(#[source] PlanningCtxForSqlError), + #[error("error creating logical catalog")] + CreateLogicalCatalogError(#[source] CreateLogicalCatalogError), + + #[error("Failed to resolve table references from SQL")] + TableReferenceResolution(#[source] ResolveTableReferencesError), + + #[error("Failed to resolve function references from SQL")] + FunctionReferenceResolution(#[source] ResolveFunctionReferencesError), #[error(transparent)] CoreError(CoreError), @@ -943,7 +974,9 @@ impl Error { Error::ExecutionError(_) => "EXECUTION_ERROR", Error::DatasetStoreError(_) => "DATASET_STORE_ERROR", Error::CatalogForSqlError(_) => "CATALOG_FOR_SQL_ERROR", - Error::PlanningCtxForSqlError(_) => "PLANNING_CTX_FOR_SQL_ERROR", + Error::CreateLogicalCatalogError(_) => "CREATE_LOGICAL_CATALOG_ERROR", + Error::TableReferenceResolution(_) => "TABLE_REFERENCE_RESOLUTION", + Error::FunctionReferenceResolution(_) => "FUNCTION_REFERENCE_RESOLUTION", Error::CoreError(CoreError::InvalidPlan(_)) => "INVALID_PLAN", Error::CoreError(CoreError::SqlParseError(_)) => "SQL_PARSE_ERROR", Error::CoreError(CoreError::DatasetError(_)) => "DATASET_ERROR", @@ -992,7 +1025,9 @@ impl IntoResponse for Error { Error::DatasetStoreError(_) => StatusCode::INTERNAL_SERVER_ERROR, Error::CatalogForSqlError(ref e) if e.is_table_not_found() => StatusCode::NOT_FOUND, Error::CatalogForSqlError(_) => StatusCode::INTERNAL_SERVER_ERROR, - Error::PlanningCtxForSqlError(_) => StatusCode::INTERNAL_SERVER_ERROR, + Error::CreateLogicalCatalogError(_) => StatusCode::INTERNAL_SERVER_ERROR, + Error::TableReferenceResolution(_) => StatusCode::BAD_REQUEST, + Error::FunctionReferenceResolution(_) => StatusCode::BAD_REQUEST, Error::PbDecodeError(_) => StatusCode::BAD_REQUEST, Error::UnsupportedFlightDescriptorType(_) => StatusCode::BAD_REQUEST, Error::UnsupportedFlightDescriptorCommand(_) => StatusCode::BAD_REQUEST, @@ -1033,7 +1068,9 @@ impl From for Status { Error::ExecutionError(df) => datafusion_error_to_status(df, message), Error::StreamingExecutionError(_) => Status::internal(message), Error::CatalogForSqlError(_) => Status::internal(message), - Error::PlanningCtxForSqlError(_) => Status::internal(message), + Error::CreateLogicalCatalogError(_) => Status::internal(message), + Error::TableReferenceResolution(_) => Status::invalid_argument(message), + Error::FunctionReferenceResolution(_) => Status::invalid_argument(message), Error::InvalidQuery(_) => Status::invalid_argument(message), Error::TicketEncodingError(_) => Status::invalid_argument(message), Error::TicketDecodingError(_) => Status::invalid_argument(message), diff --git a/tests/src/tests/it_admin_api_schema.rs b/tests/src/tests/it_admin_api_schema.rs index 7ebfb3c72..d11d7fe9f 100644 --- a/tests/src/tests/it_admin_api_schema.rs +++ b/tests/src/tests/it_admin_api_schema.rs @@ -1424,7 +1424,7 @@ async fn function_not_in_dataset_fails_at_catalog_construction() { "should return FUNCTION_NOT_FOUND_IN_DATASET error" ); assert!( - response.error_message.contains("eth.nonexistent_function") + response.error_message.contains("nonexistent_function") && response.error_message.contains("_/eth_firehose"), "error message should indicate function and dataset, got: {}", response.error_message @@ -1726,7 +1726,7 @@ async fn multiple_functions_mixed_validity_fails() { "should return FUNCTION_NOT_FOUND_IN_DATASET error" ); assert!( - response.error_message.contains("eth.nonexistent_fn"), + response.error_message.contains("nonexistent_fn"), "error message should indicate the invalid function, got: {}", response.error_message ); diff --git a/tests/src/tests/it_reorg.rs b/tests/src/tests/it_reorg.rs index 7d73e1520..036be1037 100644 --- a/tests/src/tests/it_reorg.rs +++ b/tests/src/tests/it_reorg.rs @@ -3,9 +3,13 @@ use std::{collections::BTreeMap, ops::RangeInclusive, time::Duration}; use alloy::primitives::BlockHash; use arrow_flight::FlightData; use common::{ - BlockNum, catalog::sql::catalog_for_sql, metadata::segments::BlockRange, sql, sql_str::SqlStr, + BlockNum, + catalog::query_server as catalog_server, + metadata::segments::BlockRange, + sql::{self, resolve_function_references, resolve_table_references}, + sql_str::SqlStr, }; -use datasets_common::reference::Reference; +use datasets_common::{partial_reference::PartialReference, reference::Reference}; use monitoring::logging; use rand::{Rng, RngCore, SeedableRng as _, rngs::StdRng}; use serde::Deserialize; @@ -468,14 +472,20 @@ impl ReorgTestCtx { .config() .make_query_env() .expect("Failed to create query environment"); - let catalog = catalog_for_sql( - test_env.daemon_server().dataset_store(), - test_env.daemon_server().data_store(), - &sql, - env, - ) - .await - .expect("Failed to create catalog for SQL query"); + let catalog = { + let table_refs = resolve_table_references::(&sql) + .expect("Failed to resolve table references"); + let func_refs = resolve_function_references::(&sql) + .expect("Failed to resolve function references"); + catalog_server::create_phy( + test_env.daemon_server().dataset_store(), + test_env.daemon_server().data_store(), + &env.isolate_pool, + (table_refs, func_refs), + ) + .await + .expect("Failed to create catalog for SQL query") + }; let table = catalog .tables() .iter()