Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ All notable changes to this project will be documented in this file.

- Support activating and deactivation Trino clusters via API calls to `/admin/clusters/{cluster_name}/activate` and `/admin/clusters/{cluster_name}/deactivate` respectively. For this to work you need to authenticate yourself at trino-lb via basic auth ([#95]).
- Expose cluster statistics at `/admin/clusters/{cluster_name}/status` and `/admin/clusters/status` ([#95]).
- Support configuring an external endpoint of Trino clusters.
This is used to update the segments ackUris to, as sometimes Trino get's confused and put's the wrong endpoint (namely the one of trino-lb) in there.
Please note that this runs a database migration on Postgres ([#100]).

### Changed

Expand All @@ -30,6 +33,7 @@ All notable changes to this project will be documented in this file.
[#91]: https://github.com/stackabletech/trino-lb/pull/91
[#95]: https://github.com/stackabletech/trino-lb/pull/95
[#98]: https://github.com/stackabletech/trino-lb/pull/98
[#100]: https://github.com/stackabletech/trino-lb/pull/100

## [0.5.0] - 2025-03-14

Expand Down
4 changes: 4 additions & 0 deletions trino-lb-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ pub struct TrinoClusterGroupConfig {
pub struct TrinoClusterConfig {
pub name: String,
pub endpoint: Url,

/// Public endpoint of the Trino cluster.
/// This can e.g. be used to change segment ackUris to.
pub external_endpoint: Option<Url>,
pub credentials: TrinoClusterCredentialsConfig,
}

Expand Down
128 changes: 126 additions & 2 deletions trino-lb-core/src/trino_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use prusto::{QueryError, Warning};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use serde_json::{Value, value::RawValue};
use snafu::{ResultExt, Snafu};
use tracing::instrument;
use url::Url;
Expand All @@ -23,6 +23,12 @@ pub enum Error {
#[snafu(display("Failed to parse nextUri Trino send us"))]
ParseNextUriFromTrino { source: url::ParseError },

#[snafu(display("Failed to parse segment ackUri Trino send us"))]
ParseSegmentAckUriFromTrino { source: url::ParseError },

#[snafu(display("Failed to change segment ackUri to point to external Trino address"))]
ChangeSegmentAckUriToTrino { source: url::ParseError },

#[snafu(display(
"Failed to determine the elapsed time of a queued query. Are all system clocks of trino-lb instances in sync?"
))]
Expand All @@ -48,7 +54,7 @@ pub struct TrinoQueryApiResponse {
pub partial_cancel_uri: Option<String>,

pub columns: Option<Box<RawValue>>,
pub data: Option<Box<RawValue>>,
pub data: Option<Box<Value>>,

pub error: Option<QueryError>,
pub warnings: Vec<Warning>,
Expand Down Expand Up @@ -172,17 +178,82 @@ impl TrinoQueryApiResponse {
update_count: None,
})
}

/// Changes the following references in the query (if they exist)
///
/// 1. nextUri to point to trino-lb
/// 2. In case the `external_trino_addr` is set, segments ackUri to point to the external
/// address of Trino. Trino sometimes get's confused (likely by some HTTP) headers and put's the
/// trino-lb address into the ackUri (but the requests should go to Trino directly).
#[instrument(
skip(self),
fields(trino_lb_addr = %trino_lb_addr),
)]
pub fn update_trino_references(
&mut self,
trino_lb_addr: &Url,
external_trino_addr: Option<&Url>,
) -> Result<(), Error> {
// Point nextUri to trino-lb
if let Some(next_uri) = &self.next_uri {
let next_uri = Url::parse(next_uri).context(ParseNextUriFromTrinoSnafu)?;
self.next_uri = Some(change_next_uri_to_trino_lb(&next_uri, trino_lb_addr).to_string());
}

// Point segment ackUris to Trino
if let Some(external_trino_addr) = external_trino_addr
&& let Some(data) = self.data.as_deref_mut()
{
change_segment_ack_uris_to_trino(data, external_trino_addr)?;
}

Ok(())
}
}

#[instrument(
fields(next_uri = %next_uri, trino_lb_addr = %trino_lb_addr),
)]
fn change_next_uri_to_trino_lb(next_uri: &Url, trino_lb_addr: &Url) -> Url {
let mut result = trino_lb_addr.clone();
result.set_path(next_uri.path());
result
}

#[instrument(
skip(data),
fields(external_trino_addr = %external_trino_addr),
)]
fn change_segment_ack_uris_to_trino(
data: &mut Value,
external_trino_addr: &Url,
) -> Result<(), Error> {
let Some(segments) = data.get_mut("segments").and_then(Value::as_array_mut) else {
return Ok(());
};

for segment in segments {
if let Some("spooled") = segment.get("type").and_then(Value::as_str)
&& let Some(ack_uri) = segment.get_mut("ackUri")
&& let Some(ack_uri_str) = ack_uri.as_str()
{
let parsed_ack_uri = ack_uri_str
.parse::<Url>()
.context(ParseSegmentAckUriFromTrinoSnafu)?;
let mut result = external_trino_addr.clone();
result.set_path(parsed_ack_uri.path());

*ack_uri = Value::String(result.to_string());
}
}

Ok(())
}

#[cfg(test)]
mod tests {
use rstest::rstest;
use serde_json::json;

use super::*;

Expand Down Expand Up @@ -214,4 +285,57 @@ mod tests {
let result = change_next_uri_to_trino_lb(&next_uri, &trino_lb_addr);
assert_eq!(result.to_string(), expected);
}

#[test]
fn test_change_segment_ack_uris_to_trino() {
let mut data = json!({
"encoding": "json+zstd",
"segments": [
{
"type": "spooled",
"uri": "https://minio:9000/trino/spooling/01KCAH1KEE432S8VXFDJTZYTTT.json%2Bzstd?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20251212T121622Z&X-Amz-SignedHeaders=host%3Bx-amz-server-side-encryption-customer-algorithm%3Bx-amz-server-side-encryption-customer-key%3Bx-amz-server-side-encryption-customer-key-md5&X-Amz-Credential=minioAccessKey%2F20251212%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Expires=3600&X-Amz-Signature=331b80bdae6c92352d12985ae8863dddbc72c755d49466c1aeeb732cd08b7d8d",
"ackUri": "https://trino-client-spooling-coordinator:8443/v1/spooled/ack/LYp8Bg6PDoTuUO86fmNMQNhtC0xryOhvWpL2LXhwLI4=",
"metadata": {
"segmentSize": 2716023,
"uncompressedSize": 7706400,
"rowsCount": 43761,
"expiresAt": "2025-12-13T01:16:21.454",
"rowOffset": 10952
},
"headers": {
"x-amz-server-side-encryption-customer-algorithm": [
"AES256"
],
"x-amz-server-side-encryption-customer-key": [
"iemW0eosEhVVn+QR3q/OApysz8ieRCzAHngdoJFlbHY="
],
"x-amz-server-side-encryption-customer-key-MD5": [
"D1VfXAwD/ffApNMNf3gBig=="
]
}
}
]
});
let external_trino_addr = "https://trino-external:1234"
.parse()
.expect("static URL is always valid");

change_segment_ack_uris_to_trino(&mut data, &external_trino_addr).unwrap();

let segment = data
.get("segments")
.unwrap()
.as_array()
.unwrap()
.first()
.unwrap();
assert_eq!(
segment.get("uri").unwrap(),
"https://minio:9000/trino/spooling/01KCAH1KEE432S8VXFDJTZYTTT.json%2Bzstd?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20251212T121622Z&X-Amz-SignedHeaders=host%3Bx-amz-server-side-encryption-customer-algorithm%3Bx-amz-server-side-encryption-customer-key%3Bx-amz-server-side-encryption-customer-key-md5&X-Amz-Credential=minioAccessKey%2F20251212%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Expires=3600&X-Amz-Signature=331b80bdae6c92352d12985ae8863dddbc72c755d49466c1aeeb732cd08b7d8d"
);
assert_eq!(
segment.get("ackUri").unwrap(),
"https://trino-external:1234/v1/spooled/ack/LYp8Bg6PDoTuUO86fmNMQNhtC0xryOhvWpL2LXhwLI4="
);
}
}
6 changes: 6 additions & 0 deletions trino-lb-core/src/trino_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub struct TrinoQuery {
/// Endpoint of the Trino cluster the query is running on.
pub trino_endpoint: Url,

/// (Optionally, if configured) public endpoint of the Trino cluster.
/// This can e.g. be used to change segment ackUris to.
pub trino_external_endpoint: Option<Url>,

/// The time the query was submitted to trino-lb.
pub creation_time: SystemTime,

Expand Down Expand Up @@ -80,13 +84,15 @@ impl TrinoQuery {
trino_cluster: TrinoClusterName,
trino_query_id: TrinoQueryId,
trino_endpoint: Url,
trino_external_endpoint: Option<Url>,
creation_time: SystemTime,
delivered_time: SystemTime,
) -> Self {
TrinoQuery {
id: trino_query_id,
trino_cluster,
trino_endpoint,
trino_external_endpoint,
creation_time,
delivered_time,
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions trino-lb-persistence/src/postgres/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Postgres sqlx stuff

First start a postgres:

```bash
docker run --rm -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=admin postgres
```

Afterwards you set the `DATABASE_URL` env var and prepare stuff for offline compilation:

```bash
export DATABASE_URL=postgres://postgres:postgres@localhost/postgres

cd trino-lb-persistence

cargo sqlx migrate run --source src/postgres/migrations
cargo sqlx prepare --workspace
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE queries
-- nullable, as it's Option<&str>
ADD trino_external_endpoint VARCHAR;
18 changes: 15 additions & 3 deletions trino-lb-persistence/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ pub enum Error {
#[snafu(display("Failed to parse endpoint url of cluster from stored query"))]
ParseClusterEndpointFromStoredQuery { source: url::ParseError },

#[snafu(display("Failed to parse external endpoint url of cluster from stored query"))]
ParseClusterExternalEndpointFromStoredQuery { source: url::ParseError },

#[snafu(display("Failed to convert max query counter to u64, as it is too high"))]
ConvertMaxAllowedQueryCounterToU64 { source: TryFromIntError },

Expand Down Expand Up @@ -204,11 +207,12 @@ impl Persistence for PostgresPersistence {
#[instrument(skip(self, query))]
async fn store_query(&self, query: TrinoQuery) -> Result<(), super::Error> {
query!(
r#"INSERT INTO queries (id, trino_cluster, trino_endpoint, creation_time, delivered_time)
VALUES ($1, $2, $3, $4, $5)"#,
r#"INSERT INTO queries (id, trino_cluster, trino_endpoint, trino_external_endpoint, creation_time, delivered_time)
VALUES ($1, $2, $3, $4, $5, $6)"#,
query.id,
query.trino_cluster,
query.trino_endpoint.as_str(),
query.trino_external_endpoint.as_ref().map(Url::as_str),
Into::<DateTime<Utc>>::into(query.creation_time),
Into::<DateTime<Utc>>::into(query.delivered_time),
)
Expand All @@ -222,7 +226,7 @@ impl Persistence for PostgresPersistence {
#[instrument(skip(self))]
async fn load_query(&self, query_id: &TrinoQueryId) -> Result<TrinoQuery, super::Error> {
let result = query!(
r#"SELECT id, trino_cluster, trino_endpoint, creation_time, delivered_time
r#"SELECT id, trino_cluster, trino_endpoint, trino_external_endpoint, creation_time, delivered_time
FROM queries
WHERE id = $1"#,
query_id,
Expand All @@ -231,11 +235,19 @@ impl Persistence for PostgresPersistence {
.await
.context(LoadQuerySnafu)?;

let trino_external_endpoint = match result.trino_external_endpoint {
Some(trino_external_endpoint) => Some(
Url::parse(&trino_external_endpoint)
.context(ParseClusterExternalEndpointFromStoredQuerySnafu)?,
),
None => None,
};
let query = TrinoQuery {
id: result.id,
trino_cluster: result.trino_cluster,
trino_endpoint: Url::parse(&result.trino_endpoint)
.context(ParseClusterEndpointFromStoredQuerySnafu)?,
trino_external_endpoint,
creation_time: result.creation_time.into(),
delivered_time: result.delivered_time.into(),
};
Expand Down
2 changes: 2 additions & 0 deletions trino-lb/src/cluster_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub struct TrinoCluster {
pub name: String,
pub max_running_queries: u64,
pub endpoint: Url,
pub external_endpoint: Option<Url>,
}

#[derive(Clone, Debug, Serialize)]
Expand Down Expand Up @@ -145,6 +146,7 @@ impl ClusterGroupManager {
name: cluster_name,
max_running_queries: group_config.max_running_queries,
endpoint: cluster_config.endpoint.clone(),
external_endpoint: cluster_config.external_endpoint.clone(),
})
}
groups.insert(group_name.clone(), group);
Expand Down
Loading
Loading