Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
725 changes: 505 additions & 220 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 9 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ edition = "2024"
repository = "https://github.com/stackabletech/trino-lb"

[workspace.dependencies]
askama = "0.14"
askama = "0.15"
axum = { version = "0.8", features = ["tracing"] }
axum-extra = { version = "0.12", features = ["typed-header"] }
# If we use the feature "tls-rustls" it will pull in the "aws-lc-rs" crate, which as of 2024-08-16 I did not get to build in the "make run-dev" workflow :/
axum-server = { version = "0.7", features = ["tls-rustls-no-provider"] }
axum-server = { version = "0.8", features = ["tls-rustls"] }
bincode = { version = "2.0", features = ["serde"] }
chrono = "0.4"
clap = { version = "4.5", features = ["derive"] }
Expand All @@ -29,10 +28,10 @@ http = "1.1"
http-serde = "2.1"
humantime-serde = "1.1"
indoc = "2.0"
k8s-openapi = { version = "0.26", default-features = false, features = [
"v1_34",
k8s-openapi = { version = "0.27", default-features = false, features = [
"v1_35",
] }
kube = { version = "2.0", default-features = false, features = [
kube = { version = "3.0", default-features = false, features = [
"client",
"jsonpatch",
"runtime",
Expand All @@ -50,22 +49,22 @@ prometheus = "0.13"
prusto = "0.5"
pyo3 = { version = "0.27", features = ["auto-initialize"] }
rand = "0.9"
redis = { version = "0.32", default-features = false, features = [
redis = { version = "1.0", default-features = false, features = [
"acl",
"tokio-rustls-comp",
"cluster",
"cluster-async",
"connection-manager",
"keep-alive",
"script",
] }
regex = "1.10"
reqwest = { version = "0.12", default-features = false, features = [
"rustls-tls",
reqwest = { version = "0.13", default-features = false, features = [
"rustls",
"gzip",
"json",
"cookies",
] }
rustls = "0.23"
rstest = "0.26"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["raw_value"] }
Expand Down
6 changes: 3 additions & 3 deletions nix/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
"homepage": "",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "573c650e8a14b2faa0041645ab18aed7e60f0c9a",
"sha256": "0qg99zj0gb0pc6sjlkmwhk1c1xz14qxmk6gamgfmcxpsfdp5vn72",
"rev": "61db79b0c6b838d9894923920b612048e1201926",
"sha256": "1zihs797ra009jl5knx7p4qk38wa1rmarfy286y8ywjv5wbx35l7",
"type": "tarball",
"url": "https://github.com/NixOS/nixpkgs/archive/573c650e8a14b2faa0041645ab18aed7e60f0c9a.tar.gz",
"url": "https://github.com/NixOS/nixpkgs/archive/61db79b0c6b838d9894923920b612048e1201926.tar.gz",
"url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz"
}
}
11 changes: 9 additions & 2 deletions trino-lb-persistence/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ pub enum Error {

#[snafu(display("Invalid response from compare and set lua script. Expected either 0 or 1"))]
InvalidCASScriptResponse { response: u64 },

#[snafu(display("Failed to list queued queries for cluster group {cluster_group:?}"))]
ListQueuedQueries {
source: RedisError,
cluster_group: String,
},
}

/// This Redis implementation works against Redis clusters. It uses a single connection that is shared between all
Expand Down Expand Up @@ -143,8 +149,8 @@ impl RedisPersistence<ConnectionManager> {
info!(redis_host, "Using redis persistence");

let redis_config = ConnectionManagerConfig::new()
.set_connection_timeout(REDIS_CONNECTION_TIMEOUT)
.set_response_timeout(REDIS_RESPONSE_TIMEOUT);
.set_connection_timeout(Some(REDIS_CONNECTION_TIMEOUT))
.set_response_timeout(Some(REDIS_RESPONSE_TIMEOUT));

let client = Client::open(config.endpoint.as_str()).context(CreateClientSnafu)?;
let connection = client
Expand Down Expand Up @@ -539,6 +545,7 @@ where
if let Ok(mut queued) = connection.sscan(queued_query_set_name(cluster_group)).await {
// TODO: Await `load_queued_query` in parallel (if possible) or add them to a Vec to bulk-delete afterwards
while let Some(key) = queued.next_item().await {
let key = key.with_context(|_| ListQueuedQueriesSnafu { cluster_group })?;
let queued_query = self.load_queued_query(&key).await?;
if &queued_query.last_accessed < not_accessed_after {
self.remove_queued_query(&queued_query).await?;
Expand Down
1 change: 1 addition & 0 deletions trino-lb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ rand.workspace = true
redis.workspace = true
regex.workspace = true
reqwest.workspace = true
rustls.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
serde.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion trino-lb/src/http_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub async fn start_http_server(
Ok(())
}

async fn graceful_shutdown(handle: Handle) {
async fn graceful_shutdown(handle: Handle<SocketAddr>) {
wait_for_shutdown_signal().await;

info!("Shutting down gracefully");
Expand Down
6 changes: 6 additions & 0 deletions trino-lb/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ pub enum Error {
/// We can not use the `#[tokio::main]` macro, as we need at least 3 worker threads because of some magic happening
/// in metric collection that are related to <https://github.com/open-telemetry/opentelemetry-rust/issues/1376#issuecomment-1816813128>
fn main() -> Result<(), MainError> {
// To prevent `no process-level CryptoProvider available -- call CryptoProvider::install_default() before this point`,
// see https://github.com/rustls/rustls/issues/1938 for details
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.map_err(|_| Error::InstallRustlsCryptoProvider)?;

const ENV_WORKER_THREADS: &str = "TOKIO_WORKER_THREADS";

let worker_threads = match std::env::var(ENV_WORKER_THREADS) {
Expand Down
9 changes: 4 additions & 5 deletions trino-lb/src/scaling/stackable.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::HashMap;

use chrono::{DateTime, Utc};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time;
use k8s_openapi::{apimachinery::pkg::apis::meta::v1::Time, jiff::Timestamp};
use kube::{
Api, Client, Discovery,
api::{Patch, PatchParams},
Expand Down Expand Up @@ -415,7 +414,7 @@ impl ScalerTrait for StackableScaler {
}
}

fn elapsed_seconds_since(datetime: DateTime<Utc>) -> i64 {
let now = Utc::now();
(now - datetime).num_seconds()
fn elapsed_seconds_since(datetime: Timestamp) -> i64 {
let now = Timestamp::now();
(now - datetime).get_seconds()
}
Loading