diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f31d25..213d713 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ All notable changes to this project will be documented in this file. - Support activating and deactivation Trino clusters via API calls to `/admin/clusters/{cluster_name}/activate` and `/admin/clusters/{cluster_name}/deactivate` respectively. For this to work you need to authenticate yourself at trino-lb via basic auth ([#95]). - Expose cluster statistics at `/admin/clusters/{cluster_name}/status` and `/admin/clusters/status` ([#95]). +- Support configuring an external endpoint of Trino clusters. + This is used to update the segments ackUris to, as sometimes Trino get's confused and put's the wrong endpoint (namely the one of trino-lb) in there. + Please note that this runs a database migration on Postgres ([#100]). ### Changed @@ -30,6 +33,7 @@ All notable changes to this project will be documented in this file. [#91]: https://github.com/stackabletech/trino-lb/pull/91 [#95]: https://github.com/stackabletech/trino-lb/pull/95 [#98]: https://github.com/stackabletech/trino-lb/pull/98 +[#100]: https://github.com/stackabletech/trino-lb/pull/100 ## [0.5.0] - 2025-03-14 diff --git a/trino-lb-core/src/config.rs b/trino-lb-core/src/config.rs index 19c4287..71745ca 100644 --- a/trino-lb-core/src/config.rs +++ b/trino-lb-core/src/config.rs @@ -191,6 +191,10 @@ pub struct TrinoClusterGroupConfig { pub struct TrinoClusterConfig { pub name: String, pub endpoint: Url, + + /// Public endpoint of the Trino cluster. + /// This can e.g. be used to change segment ackUris to. + pub external_endpoint: Option, pub credentials: TrinoClusterCredentialsConfig, } diff --git a/trino-lb-core/src/trino_api.rs b/trino-lb-core/src/trino_api.rs index 4e9c831..b3c06c4 100644 --- a/trino-lb-core/src/trino_api.rs +++ b/trino-lb-core/src/trino_api.rs @@ -5,7 +5,7 @@ use std::{ use prusto::{QueryError, Warning}; use serde::{Deserialize, Serialize}; -use serde_json::value::RawValue; +use serde_json::{Value, value::RawValue}; use snafu::{ResultExt, Snafu}; use tracing::instrument; use url::Url; @@ -23,6 +23,12 @@ pub enum Error { #[snafu(display("Failed to parse nextUri Trino send us"))] ParseNextUriFromTrino { source: url::ParseError }, + #[snafu(display("Failed to parse segment ackUri Trino send us"))] + ParseSegmentAckUriFromTrino { source: url::ParseError }, + + #[snafu(display("Failed to change segment ackUri to point to external Trino address"))] + ChangeSegmentAckUriToTrino { source: url::ParseError }, + #[snafu(display( "Failed to determine the elapsed time of a queued query. Are all system clocks of trino-lb instances in sync?" ))] @@ -48,7 +54,7 @@ pub struct TrinoQueryApiResponse { pub partial_cancel_uri: Option, pub columns: Option>, - pub data: Option>, + pub data: Option>, pub error: Option, pub warnings: Vec, @@ -172,17 +178,82 @@ impl TrinoQueryApiResponse { update_count: None, }) } + + /// Changes the following references in the query (if they exist) + /// + /// 1. nextUri to point to trino-lb + /// 2. In case the `external_trino_addr` is set, segments ackUri to point to the external + /// address of Trino. Trino sometimes get's confused (likely by some HTTP) headers and put's the + /// trino-lb address into the ackUri (but the requests should go to Trino directly). + #[instrument( + skip(self), + fields(trino_lb_addr = %trino_lb_addr), + )] + pub fn update_trino_references( + &mut self, + trino_lb_addr: &Url, + external_trino_addr: Option<&Url>, + ) -> Result<(), Error> { + // Point nextUri to trino-lb + if let Some(next_uri) = &self.next_uri { + let next_uri = Url::parse(next_uri).context(ParseNextUriFromTrinoSnafu)?; + self.next_uri = Some(change_next_uri_to_trino_lb(&next_uri, trino_lb_addr).to_string()); + } + + // Point segment ackUris to Trino + if let Some(external_trino_addr) = external_trino_addr + && let Some(data) = self.data.as_deref_mut() + { + change_segment_ack_uris_to_trino(data, external_trino_addr)?; + } + + Ok(()) + } } +#[instrument( + fields(next_uri = %next_uri, trino_lb_addr = %trino_lb_addr), +)] fn change_next_uri_to_trino_lb(next_uri: &Url, trino_lb_addr: &Url) -> Url { let mut result = trino_lb_addr.clone(); result.set_path(next_uri.path()); result } +#[instrument( + skip(data), + fields(external_trino_addr = %external_trino_addr), +)] +fn change_segment_ack_uris_to_trino( + data: &mut Value, + external_trino_addr: &Url, +) -> Result<(), Error> { + let Some(segments) = data.get_mut("segments").and_then(Value::as_array_mut) else { + return Ok(()); + }; + + for segment in segments { + if let Some("spooled") = segment.get("type").and_then(Value::as_str) + && let Some(ack_uri) = segment.get_mut("ackUri") + && let Some(ack_uri_str) = ack_uri.as_str() + { + let parsed_ack_uri = ack_uri_str + .parse::() + .context(ParseSegmentAckUriFromTrinoSnafu)?; + let mut result = external_trino_addr.clone(); + result.set_path(parsed_ack_uri.path()); + + *ack_uri = Value::String(result.to_string()); + } + } + + Ok(()) +} + #[cfg(test)] mod tests { use rstest::rstest; + use serde_json::json; use super::*; @@ -214,4 +285,57 @@ mod tests { let result = change_next_uri_to_trino_lb(&next_uri, &trino_lb_addr); assert_eq!(result.to_string(), expected); } + + #[test] + fn test_change_segment_ack_uris_to_trino() { + let mut data = json!({ + "encoding": "json+zstd", + "segments": [ + { + "type": "spooled", + "uri": "https://minio:9000/trino/spooling/01KCAH1KEE432S8VXFDJTZYTTT.json%2Bzstd?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20251212T121622Z&X-Amz-SignedHeaders=host%3Bx-amz-server-side-encryption-customer-algorithm%3Bx-amz-server-side-encryption-customer-key%3Bx-amz-server-side-encryption-customer-key-md5&X-Amz-Credential=minioAccessKey%2F20251212%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Expires=3600&X-Amz-Signature=331b80bdae6c92352d12985ae8863dddbc72c755d49466c1aeeb732cd08b7d8d", + "ackUri": "https://trino-client-spooling-coordinator:8443/v1/spooled/ack/LYp8Bg6PDoTuUO86fmNMQNhtC0xryOhvWpL2LXhwLI4=", + "metadata": { + "segmentSize": 2716023, + "uncompressedSize": 7706400, + "rowsCount": 43761, + "expiresAt": "2025-12-13T01:16:21.454", + "rowOffset": 10952 + }, + "headers": { + "x-amz-server-side-encryption-customer-algorithm": [ + "AES256" + ], + "x-amz-server-side-encryption-customer-key": [ + "iemW0eosEhVVn+QR3q/OApysz8ieRCzAHngdoJFlbHY=" + ], + "x-amz-server-side-encryption-customer-key-MD5": [ + "D1VfXAwD/ffApNMNf3gBig==" + ] + } + } + ] + }); + let external_trino_addr = "https://trino-external:1234" + .parse() + .expect("static URL is always valid"); + + change_segment_ack_uris_to_trino(&mut data, &external_trino_addr).unwrap(); + + let segment = data + .get("segments") + .unwrap() + .as_array() + .unwrap() + .first() + .unwrap(); + assert_eq!( + segment.get("uri").unwrap(), + "https://minio:9000/trino/spooling/01KCAH1KEE432S8VXFDJTZYTTT.json%2Bzstd?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20251212T121622Z&X-Amz-SignedHeaders=host%3Bx-amz-server-side-encryption-customer-algorithm%3Bx-amz-server-side-encryption-customer-key%3Bx-amz-server-side-encryption-customer-key-md5&X-Amz-Credential=minioAccessKey%2F20251212%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Expires=3600&X-Amz-Signature=331b80bdae6c92352d12985ae8863dddbc72c755d49466c1aeeb732cd08b7d8d" + ); + assert_eq!( + segment.get("ackUri").unwrap(), + "https://trino-external:1234/v1/spooled/ack/LYp8Bg6PDoTuUO86fmNMQNhtC0xryOhvWpL2LXhwLI4=" + ); + } } diff --git a/trino-lb-core/src/trino_query.rs b/trino-lb-core/src/trino_query.rs index 9701ac8..ebf0d0e 100644 --- a/trino-lb-core/src/trino_query.rs +++ b/trino-lb-core/src/trino_query.rs @@ -52,6 +52,10 @@ pub struct TrinoQuery { /// Endpoint of the Trino cluster the query is running on. pub trino_endpoint: Url, + /// (Optionally, if configured) public endpoint of the Trino cluster. + /// This can e.g. be used to change segment ackUris to. + pub trino_external_endpoint: Option, + /// The time the query was submitted to trino-lb. pub creation_time: SystemTime, @@ -80,6 +84,7 @@ impl TrinoQuery { trino_cluster: TrinoClusterName, trino_query_id: TrinoQueryId, trino_endpoint: Url, + trino_external_endpoint: Option, creation_time: SystemTime, delivered_time: SystemTime, ) -> Self { @@ -87,6 +92,7 @@ impl TrinoQuery { id: trino_query_id, trino_cluster, trino_endpoint, + trino_external_endpoint, creation_time, delivered_time, } diff --git a/trino-lb-persistence/.sqlx/query-dcdff617008bd9e5db76c1aa6099da4728bca33d362d6ff990e14b7e4b17288b.json b/trino-lb-persistence/.sqlx/query-1f8cc8bc6093d2fc198d0b2849568b812633632a68235f2fa26f80a9c5352ded.json similarity index 60% rename from trino-lb-persistence/.sqlx/query-dcdff617008bd9e5db76c1aa6099da4728bca33d362d6ff990e14b7e4b17288b.json rename to trino-lb-persistence/.sqlx/query-1f8cc8bc6093d2fc198d0b2849568b812633632a68235f2fa26f80a9c5352ded.json index a184cf3..a2abdf4 100644 --- a/trino-lb-persistence/.sqlx/query-dcdff617008bd9e5db76c1aa6099da4728bca33d362d6ff990e14b7e4b17288b.json +++ b/trino-lb-persistence/.sqlx/query-1f8cc8bc6093d2fc198d0b2849568b812633632a68235f2fa26f80a9c5352ded.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO queries (id, trino_cluster, trino_endpoint, creation_time, delivered_time)\n VALUES ($1, $2, $3, $4, $5)", + "query": "INSERT INTO queries (id, trino_cluster, trino_endpoint, trino_external_endpoint, creation_time, delivered_time)\n VALUES ($1, $2, $3, $4, $5, $6)", "describe": { "columns": [], "parameters": { @@ -8,11 +8,12 @@ "Varchar", "Varchar", "Varchar", + "Varchar", "Timestamptz", "Timestamptz" ] }, "nullable": [] }, - "hash": "dcdff617008bd9e5db76c1aa6099da4728bca33d362d6ff990e14b7e4b17288b" + "hash": "1f8cc8bc6093d2fc198d0b2849568b812633632a68235f2fa26f80a9c5352ded" } diff --git a/trino-lb-persistence/.sqlx/query-d6b1cc38b5e80b51b157d1989beec66a68126c1606a1ae39caeed6ecd380b9f1.json b/trino-lb-persistence/.sqlx/query-e727412091850e7ac76082775abe9250d852451790bb2509b260fafdb268b6b1.json similarity index 66% rename from trino-lb-persistence/.sqlx/query-d6b1cc38b5e80b51b157d1989beec66a68126c1606a1ae39caeed6ecd380b9f1.json rename to trino-lb-persistence/.sqlx/query-e727412091850e7ac76082775abe9250d852451790bb2509b260fafdb268b6b1.json index 45ce62e..cb5a1ab 100644 --- a/trino-lb-persistence/.sqlx/query-d6b1cc38b5e80b51b157d1989beec66a68126c1606a1ae39caeed6ecd380b9f1.json +++ b/trino-lb-persistence/.sqlx/query-e727412091850e7ac76082775abe9250d852451790bb2509b260fafdb268b6b1.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT id, trino_cluster, trino_endpoint, creation_time, delivered_time\n FROM queries\n WHERE id = $1", + "query": "SELECT id, trino_cluster, trino_endpoint, trino_external_endpoint, creation_time, delivered_time\n FROM queries\n WHERE id = $1", "describe": { "columns": [ { @@ -20,11 +20,16 @@ }, { "ordinal": 3, + "name": "trino_external_endpoint", + "type_info": "Varchar" + }, + { + "ordinal": 4, "name": "creation_time", "type_info": "Timestamptz" }, { - "ordinal": 4, + "ordinal": 5, "name": "delivered_time", "type_info": "Timestamptz" } @@ -38,9 +43,10 @@ false, false, false, + true, false, false ] }, - "hash": "d6b1cc38b5e80b51b157d1989beec66a68126c1606a1ae39caeed6ecd380b9f1" + "hash": "e727412091850e7ac76082775abe9250d852451790bb2509b260fafdb268b6b1" } diff --git a/trino-lb-persistence/src/postgres/README.md b/trino-lb-persistence/src/postgres/README.md new file mode 100644 index 0000000..20827c2 --- /dev/null +++ b/trino-lb-persistence/src/postgres/README.md @@ -0,0 +1,18 @@ +# Postgres sqlx stuff + +First start a postgres: + +```bash +docker run --rm -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=admin postgres +``` + +Afterwards you set the `DATABASE_URL` env var and prepare stuff for offline compilation: + +```bash +export DATABASE_URL=postgres://postgres:postgres@localhost/postgres + +cd trino-lb-persistence + +cargo sqlx migrate run --source src/postgres/migrations +cargo sqlx prepare --workspace +``` diff --git a/trino-lb-persistence/src/postgres/migrations/20260101314410_add_trino_external_endpoint.sql b/trino-lb-persistence/src/postgres/migrations/20260101314410_add_trino_external_endpoint.sql new file mode 100644 index 0000000..aff5474 --- /dev/null +++ b/trino-lb-persistence/src/postgres/migrations/20260101314410_add_trino_external_endpoint.sql @@ -0,0 +1,3 @@ +ALTER TABLE queries + -- nullable, as it's Option<&str> +ADD trino_external_endpoint VARCHAR; diff --git a/trino-lb-persistence/src/postgres/mod.rs b/trino-lb-persistence/src/postgres/mod.rs index 710eb31..7d08f25 100644 --- a/trino-lb-persistence/src/postgres/mod.rs +++ b/trino-lb-persistence/src/postgres/mod.rs @@ -92,6 +92,9 @@ pub enum Error { #[snafu(display("Failed to parse endpoint url of cluster from stored query"))] ParseClusterEndpointFromStoredQuery { source: url::ParseError }, + #[snafu(display("Failed to parse external endpoint url of cluster from stored query"))] + ParseClusterExternalEndpointFromStoredQuery { source: url::ParseError }, + #[snafu(display("Failed to convert max query counter to u64, as it is too high"))] ConvertMaxAllowedQueryCounterToU64 { source: TryFromIntError }, @@ -204,11 +207,12 @@ impl Persistence for PostgresPersistence { #[instrument(skip(self, query))] async fn store_query(&self, query: TrinoQuery) -> Result<(), super::Error> { query!( - r#"INSERT INTO queries (id, trino_cluster, trino_endpoint, creation_time, delivered_time) - VALUES ($1, $2, $3, $4, $5)"#, + r#"INSERT INTO queries (id, trino_cluster, trino_endpoint, trino_external_endpoint, creation_time, delivered_time) + VALUES ($1, $2, $3, $4, $5, $6)"#, query.id, query.trino_cluster, query.trino_endpoint.as_str(), + query.trino_external_endpoint.as_ref().map(Url::as_str), Into::>::into(query.creation_time), Into::>::into(query.delivered_time), ) @@ -222,7 +226,7 @@ impl Persistence for PostgresPersistence { #[instrument(skip(self))] async fn load_query(&self, query_id: &TrinoQueryId) -> Result { let result = query!( - r#"SELECT id, trino_cluster, trino_endpoint, creation_time, delivered_time + r#"SELECT id, trino_cluster, trino_endpoint, trino_external_endpoint, creation_time, delivered_time FROM queries WHERE id = $1"#, query_id, @@ -231,11 +235,19 @@ impl Persistence for PostgresPersistence { .await .context(LoadQuerySnafu)?; + let trino_external_endpoint = match result.trino_external_endpoint { + Some(trino_external_endpoint) => Some( + Url::parse(&trino_external_endpoint) + .context(ParseClusterExternalEndpointFromStoredQuerySnafu)?, + ), + None => None, + }; let query = TrinoQuery { id: result.id, trino_cluster: result.trino_cluster, trino_endpoint: Url::parse(&result.trino_endpoint) .context(ParseClusterEndpointFromStoredQuerySnafu)?, + trino_external_endpoint, creation_time: result.creation_time.into(), delivered_time: result.delivered_time.into(), }; diff --git a/trino-lb/src/cluster_group_manager.rs b/trino-lb/src/cluster_group_manager.rs index b98f419..1156399 100644 --- a/trino-lb/src/cluster_group_manager.rs +++ b/trino-lb/src/cluster_group_manager.rs @@ -86,6 +86,7 @@ pub struct TrinoCluster { pub name: String, pub max_running_queries: u64, pub endpoint: Url, + pub external_endpoint: Option, } #[derive(Clone, Debug, Serialize)] @@ -145,6 +146,7 @@ impl ClusterGroupManager { name: cluster_name, max_running_queries: group_config.max_running_queries, endpoint: cluster_config.endpoint.clone(), + external_endpoint: cluster_config.external_endpoint.clone(), }) } groups.insert(group_name.clone(), group); diff --git a/trino-lb/src/http_server/v1/statement.rs b/trino-lb/src/http_server/v1/statement.rs index 47f1ae2..2c03dad 100644 --- a/trino-lb/src/http_server/v1/statement.rs +++ b/trino-lb/src/http_server/v1/statement.rs @@ -304,6 +304,7 @@ async fn queue_or_hand_over_query( cluster.name.clone(), trino_query_api_response.id.clone(), cluster.endpoint.clone(), + cluster.external_endpoint.clone(), *creation_time, SystemTime::now(), ); @@ -316,7 +317,10 @@ async fn queue_or_hand_over_query( )?; trino_query_api_response - .change_next_uri_to_trino_lb(&state.config.trino_lb.external_address) + .update_trino_references( + &state.config.trino_lb.external_address, + cluster.external_endpoint.as_ref(), + ) .context(ModifyNextUriSnafu)?; info!( @@ -481,7 +485,10 @@ async fn handle_query_running_on_trino( } else { // Change the nextUri to actually point to trino-lb instead of Trino. trino_query_api_response - .change_next_uri_to_trino_lb(&state.config.trino_lb.external_address) + .update_trino_references( + &state.config.trino_lb.external_address, + query.trino_external_endpoint.as_ref(), + ) .context(ModifyNextUriSnafu)?; } diff --git a/trino-lb/src/scaling/mod.rs b/trino-lb/src/scaling/mod.rs index 9fdf66f..8c83010 100644 --- a/trino-lb/src/scaling/mod.rs +++ b/trino-lb/src/scaling/mod.rs @@ -200,6 +200,7 @@ impl Scaler { name: cluster_name, max_running_queries: group_config.max_running_queries, endpoint: cluster_config.endpoint.clone(), + external_endpoint: cluster_config.external_endpoint.clone(), }) } groups.insert(group_name.clone(), group);