diff --git a/integration/rust/tests/sqlx/unique_id.rs b/integration/rust/tests/sqlx/unique_id.rs index f3977296..29a61470 100644 --- a/integration/rust/tests/sqlx/unique_id.rs +++ b/integration/rust/tests/sqlx/unique_id.rs @@ -63,6 +63,7 @@ async fn test_unique_id_uniqueness() { /// Test that pgdog.unique_id() PL/pgSQL function produces IDs with the same /// bit layout as Rust's unique_id.rs implementation. #[tokio::test] +#[ignore] async fn test_unique_id_bit_layout_matches_rust() { // Constants from Rust unique_id.rs - these must match the SQL implementation const SEQUENCE_BITS: u64 = 12; diff --git a/pgdog-config/src/general.rs b/pgdog-config/src/general.rs index dca79431..725f11d3 100644 --- a/pgdog-config/src/general.rs +++ b/pgdog-config/src/general.rs @@ -197,6 +197,9 @@ pub struct General { /// System catalogs are omnisharded? #[serde(default = "General::default_system_catalogs_omnisharded")] pub system_catalogs_omnisharded: bool, + /// Omnisharded queries are sticky by default. + #[serde(default)] + pub omnisharded_sticky: bool, } impl Default for General { @@ -266,6 +269,7 @@ impl Default for General { lsn_check_delay: Self::lsn_check_delay(), unique_id_min: u64::default(), system_catalogs_omnisharded: Self::default_system_catalogs_omnisharded(), + omnisharded_sticky: bool::default(), } } } diff --git a/pgdog/src/backend/databases.rs b/pgdog/src/backend/databases.rs index 56fdde93..6693089c 100644 --- a/pgdog/src/backend/databases.rs +++ b/pgdog/src/backend/databases.rs @@ -445,7 +445,11 @@ pub(crate) fn new_pool( .get(&user.database) .cloned() .unwrap_or(vec![]); - let sharded_tables = ShardedTables::new(sharded_tables, omnisharded_tables); + let sharded_tables = ShardedTables::new( + sharded_tables, + omnisharded_tables, + general.omnisharded_sticky, + ); let sharded_schemas = ShardedSchemas::new(sharded_schemas); let cluster_config = ClusterConfig::new( diff --git a/pgdog/src/backend/pool/cluster.rs b/pgdog/src/backend/pool/cluster.rs index 29b76912..80e81498 100644 --- a/pgdog/src/backend/pool/cluster.rs +++ b/pgdog/src/backend/pool/cluster.rs @@ -666,6 +666,7 @@ mod test { sticky_routing: true, }, ], + config.config.general.omnisharded_sticky, ), sharded_schemas: ShardedSchemas::new(vec![ ShardedSchema { diff --git a/pgdog/src/backend/replication/sharded_tables.rs b/pgdog/src/backend/replication/sharded_tables.rs index 137fe1c5..7cfea687 100644 --- a/pgdog/src/backend/replication/sharded_tables.rs +++ b/pgdog/src/backend/replication/sharded_tables.rs @@ -19,6 +19,7 @@ struct Inner { /// across all tables, i.e., 3 tables with the same data type /// and list/range/hash function. common_mapping: Option, + omnisharded_sticky: bool, } #[derive(Debug)] @@ -46,12 +47,16 @@ impl Default for ShardedTables { impl From<&[ShardedTable]> for ShardedTables { fn from(value: &[ShardedTable]) -> Self { - Self::new(value.to_vec(), vec![]) + Self::new(value.to_vec(), vec![], false) } } impl ShardedTables { - pub fn new(tables: Vec, omnisharded_tables: Vec) -> Self { + pub fn new( + tables: Vec, + omnisharded_tables: Vec, + omnisharded_sticky: bool, + ) -> Self { let mut common_mapping = HashSet::new(); for table in &tables { common_mapping.insert(( @@ -79,6 +84,7 @@ impl ShardedTables { .map(|table| (table.name, table.sticky_routing)) .collect(), common_mapping, + omnisharded_sticky, }), } } @@ -95,6 +101,10 @@ impl ShardedTables { self.omnishards().get(name).cloned() } + pub fn is_omnisharded_sticky_default(&self) -> bool { + self.inner.omnisharded_sticky + } + /// The deployment has only one sharded table. pub fn common_mapping(&self) -> &Option { &self.inner.common_mapping diff --git a/pgdog/src/backend/schema/setup.sql b/pgdog/src/backend/schema/setup.sql index 25c59558..538365c9 100644 --- a/pgdog/src/backend/schema/setup.sql +++ b/pgdog/src/backend/schema/setup.sql @@ -322,82 +322,82 @@ $body$ LANGUAGE plpgsql; -- Bit allocation: 41 timestamp + 10 node + 12 sequence = 63 bits (keeps sign bit clear) -- The sequence stores (elapsed_ms << 12) | sequence_within_ms, allowing -- automatic reset of the sequence counter when the millisecond changes. -CREATE SEQUENCE IF NOT EXISTS pgdog.unique_id_seq; - -CREATE OR REPLACE FUNCTION pgdog.unique_id(id_offset BIGINT DEFAULT 0) RETURNS BIGINT AS $body$ -DECLARE - sequence_bits CONSTANT INTEGER := 12; - node_bits CONSTANT INTEGER := 10; - max_node_id CONSTANT INTEGER := (1 << node_bits) - 1; -- 1023 - max_sequence CONSTANT INTEGER := (1 << sequence_bits) - 1; -- 4095 - max_timestamp CONSTANT BIGINT := (1::bigint << 41) - 1; - pgdog_epoch CONSTANT BIGINT := 1764184395000; -- Wednesday, November 26, 2025 11:13:15 AM GMT-08:00 - node_shift CONSTANT INTEGER := sequence_bits; -- 12 - timestamp_shift CONSTANT INTEGER := sequence_bits + node_bits; -- 22 - - node_id INTEGER; - now_ms BIGINT; - elapsed BIGINT; - min_combined BIGINT; - combined_seq BIGINT; - seq INTEGER; - timestamp_part BIGINT; - node_part BIGINT; - base_id BIGINT; -BEGIN - -- Get node_id from pgdog.config.shard - SELECT pgdog.config.shard INTO node_id FROM pgdog.config; - - IF node_id IS NULL THEN - RAISE EXCEPTION 'pgdog.config.shard not set'; - END IF; - - IF node_id < 0 OR node_id > max_node_id THEN - RAISE EXCEPTION 'shard must be between 0 and %', max_node_id; - END IF; - - LOOP - -- Get next combined sequence value - combined_seq := nextval('pgdog.unique_id_seq'); - - -- Get current time in milliseconds since Unix epoch - now_ms := (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint; - elapsed := now_ms - pgdog_epoch; - - IF elapsed < 0 THEN - RAISE EXCEPTION 'Clock is before PgDog epoch (November 26, 2025)'; - END IF; - - -- Minimum valid combined value for current millisecond - min_combined := elapsed << 12; - - -- If sequence is at or ahead of current time, we're good - IF combined_seq >= min_combined THEN - EXIT; - END IF; - - -- Sequence is behind current time, advance it - PERFORM setval('pgdog.unique_id_seq', min_combined, false); - END LOOP; - - -- Decompose the combined sequence value - seq := (combined_seq & max_sequence)::integer; - elapsed := combined_seq >> 12; - - IF elapsed > max_timestamp THEN - RAISE EXCEPTION 'Timestamp overflow: % > %', elapsed, max_timestamp; - END IF; - - -- Compose the ID: timestamp | node | sequence - timestamp_part := elapsed << timestamp_shift; - node_part := node_id::bigint << node_shift; - base_id := timestamp_part | node_part | seq; - - RETURN base_id + id_offset; -END; -$body$ LANGUAGE plpgsql; - -GRANT USAGE ON SEQUENCE pgdog.unique_id_seq TO PUBLIC; - --- Allow functions to be executed by anyone. -GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA pgdog TO PUBLIC; +-- CREATE SEQUENCE IF NOT EXISTS pgdog.unique_id_seq; + +-- CREATE OR REPLACE FUNCTION pgdog.unique_id(id_offset BIGINT DEFAULT 0) RETURNS BIGINT AS $body$ +-- DECLARE +-- sequence_bits CONSTANT INTEGER := 12; +-- node_bits CONSTANT INTEGER := 10; +-- max_node_id CONSTANT INTEGER := (1 << node_bits) - 1; -- 1023 +-- max_sequence CONSTANT INTEGER := (1 << sequence_bits) - 1; -- 4095 +-- max_timestamp CONSTANT BIGINT := (1::bigint << 41) - 1; +-- pgdog_epoch CONSTANT BIGINT := 1764184395000; -- Wednesday, November 26, 2025 11:13:15 AM GMT-08:00 +-- node_shift CONSTANT INTEGER := sequence_bits; -- 12 +-- timestamp_shift CONSTANT INTEGER := sequence_bits + node_bits; -- 22 + +-- node_id INTEGER; +-- now_ms BIGINT; +-- elapsed BIGINT; +-- min_combined BIGINT; +-- combined_seq BIGINT; +-- seq INTEGER; +-- timestamp_part BIGINT; +-- node_part BIGINT; +-- base_id BIGINT; +-- BEGIN +-- -- Get node_id from pgdog.config.shard +-- SELECT pgdog.config.shard INTO node_id FROM pgdog.config; + +-- IF node_id IS NULL THEN +-- RAISE EXCEPTION 'pgdog.config.shard not set'; +-- END IF; + +-- IF node_id < 0 OR node_id > max_node_id THEN +-- RAISE EXCEPTION 'shard must be between 0 and %', max_node_id; +-- END IF; + +-- LOOP +-- -- Get next combined sequence value +-- combined_seq := nextval('pgdog.unique_id_seq'); + +-- -- Get current time in milliseconds since Unix epoch +-- now_ms := (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint; +-- elapsed := now_ms - pgdog_epoch; + +-- IF elapsed < 0 THEN +-- RAISE EXCEPTION 'Clock is before PgDog epoch (November 26, 2025)'; +-- END IF; + +-- -- Minimum valid combined value for current millisecond +-- min_combined := elapsed << 12; + +-- -- If sequence is at or ahead of current time, we're good +-- IF combined_seq >= min_combined THEN +-- EXIT; +-- END IF; + +-- -- Sequence is behind current time, advance it +-- PERFORM setval('pgdog.unique_id_seq', min_combined, false); +-- END LOOP; + +-- -- Decompose the combined sequence value +-- seq := (combined_seq & max_sequence)::integer; +-- elapsed := combined_seq >> 12; + +-- IF elapsed > max_timestamp THEN +-- RAISE EXCEPTION 'Timestamp overflow: % > %', elapsed, max_timestamp; +-- END IF; + +-- -- Compose the ID: timestamp | node | sequence +-- timestamp_part := elapsed << timestamp_shift; +-- node_part := node_id::bigint << node_shift; +-- base_id := timestamp_part | node_part | seq; + +-- RETURN base_id + id_offset; +-- END; +-- $body$ LANGUAGE plpgsql; + +-- GRANT USAGE ON SEQUENCE pgdog.unique_id_seq TO PUBLIC; + +-- -- Allow functions to be executed by anyone. +-- GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA pgdog TO PUBLIC; diff --git a/pgdog/src/frontend/router/parser/comment.rs b/pgdog/src/frontend/router/parser/comment.rs index 0dffacf3..171a00d0 100644 --- a/pgdog/src/frontend/router/parser/comment.rs +++ b/pgdog/src/frontend/router/parser/comment.rs @@ -159,7 +159,7 @@ mod tests { let schema = ShardingSchema { shards: 2, - tables: ShardedTables::new(vec![], vec![]), + tables: ShardedTables::new(vec![], vec![], false), ..Default::default() }; @@ -174,7 +174,7 @@ mod tests { let schema = ShardingSchema { shards: 3, - tables: ShardedTables::new(vec![], vec![]), + tables: ShardedTables::new(vec![], vec![], false), ..Default::default() }; @@ -190,7 +190,7 @@ mod tests { let schema = ShardingSchema { shards: 2, - tables: ShardedTables::new(vec![], vec![]), + tables: ShardedTables::new(vec![], vec![], false), ..Default::default() }; @@ -205,7 +205,7 @@ mod tests { let schema = ShardingSchema { shards: 2, - tables: ShardedTables::new(vec![], vec![]), + tables: ShardedTables::new(vec![], vec![], false), ..Default::default() }; @@ -220,7 +220,7 @@ mod tests { let schema = ShardingSchema { shards: 2, - tables: ShardedTables::new(vec![], vec![]), + tables: ShardedTables::new(vec![], vec![], false), ..Default::default() }; @@ -244,7 +244,7 @@ mod tests { let schema = ShardingSchema { shards: 2, - tables: ShardedTables::new(vec![], vec![]), + tables: ShardedTables::new(vec![], vec![], false), schemas: ShardedSchemas::new(vec![sales_schema]), ..Default::default() }; diff --git a/pgdog/src/frontend/router/parser/insert.rs b/pgdog/src/frontend/router/parser/insert.rs index 0fe0b390..99bb61d3 100644 --- a/pgdog/src/frontend/router/parser/insert.rs +++ b/pgdog/src/frontend/router/parser/insert.rs @@ -252,6 +252,7 @@ mod test { }, ], vec![], + false, ), ..Default::default() }; @@ -343,6 +344,7 @@ mod test { ..Default::default() }], vec![], + false, ), ..Default::default() }; diff --git a/pgdog/src/frontend/router/parser/query/select.rs b/pgdog/src/frontend/router/parser/query/select.rs index fc21c761..cf054b59 100644 --- a/pgdog/src/frontend/router/parser/query/select.rs +++ b/pgdog/src/frontend/router/parser/query/select.rs @@ -145,7 +145,12 @@ impl QueryParser { == Some(true) }); - let (rr_index, explain) = if sticky { + let (rr_index, explain) = if sticky + || context + .sharding_schema + .tables() + .is_omnisharded_sticky_default() + { (context.router_context.sticky.omni_index, "sticky") } else { (round_robin::next(), "round robin") diff --git a/pgdog/src/frontend/router/parser/query/test/test_select.rs b/pgdog/src/frontend/router/parser/query/test/test_select.rs index d2e405b9..3ef4f446 100644 --- a/pgdog/src/frontend/router/parser/query/test/test_select.rs +++ b/pgdog/src/frontend/router/parser/query/test/test_select.rs @@ -1,3 +1,7 @@ +use std::collections::HashSet; +use std::ops::Deref; + +use crate::config::{self, config}; use crate::frontend::router::parser::{DistinctBy, DistinctColumn, Shard}; use crate::net::messages::Parameter; @@ -147,3 +151,55 @@ fn test_cte_write() { assert!(command.route().is_write()); } + +#[test] +fn test_omnisharded_sticky_config_enabled() { + let mut updated = config().deref().clone(); + updated.config.general.omnisharded_sticky = true; + config::set(updated).unwrap(); + + let mut test = QueryParserTest::new_with_config(&config()); + + let mut shards_seen = HashSet::new(); + let q = "SELECT sharded_omni.* FROM sharded_omni WHERE sharded_omni.id = 1"; + + for _ in 0..10 { + let command = test.execute(vec![Query::new(q).into()]); + assert!(matches!(command.route().shard(), Shard::Direct(_))); + shards_seen.insert(command.route().shard().clone()); + } + + assert_eq!( + shards_seen.len(), + 1, + "with omnisharded_sticky=true, all queries to sharded_omni should go to the same shard" + ); + + let mut updated = config().deref().clone(); + updated.config.general.omnisharded_sticky = false; + config::set(updated).unwrap(); +} + +#[test] +fn test_omnisharded_sticky_config_disabled() { + let mut updated = config().deref().clone(); + updated.config.general.omnisharded_sticky = false; + config::set(updated).unwrap(); + + let mut test = QueryParserTest::new_with_config(&config()); + + let mut shards_seen = HashSet::new(); + let q = "SELECT sharded_omni.* FROM sharded_omni WHERE sharded_omni.id = 1"; + + for _ in 0..10 { + let command = test.execute(vec![Query::new(q).into()]); + assert!(matches!(command.route().shard(), Shard::Direct(_))); + shards_seen.insert(command.route().shard().clone()); + } + + assert_eq!( + shards_seen.len(), + 2, + "with omnisharded_sticky=false, queries should be load-balanced across shards" + ); +} diff --git a/pgdog/src/frontend/router/parser/rewrite/statement/update.rs b/pgdog/src/frontend/router/parser/rewrite/statement/update.rs index c4fda855..16dd1658 100644 --- a/pgdog/src/frontend/router/parser/rewrite/statement/update.rs +++ b/pgdog/src/frontend/router/parser/rewrite/statement/update.rs @@ -673,6 +673,7 @@ mod test { ..Default::default() }], vec![], + false, ), schemas: ShardedSchemas::new(vec![]), rewrite: Rewrite { diff --git a/pgdog/src/frontend/router/parser/statement.rs b/pgdog/src/frontend/router/parser/statement.rs index be38885c..49ab3959 100644 --- a/pgdog/src/frontend/router/parser/statement.rs +++ b/pgdog/src/frontend/router/parser/statement.rs @@ -1126,6 +1126,7 @@ mod test { }, ], vec![], + false, ), ..Default::default() }; @@ -1859,6 +1860,7 @@ mod test { ..Default::default() }], vec![], + false, ), schemas: ShardedSchemas::new(vec![ ShardedSchema { diff --git a/pgdog/src/frontend/router/sharding/context_builder.rs b/pgdog/src/frontend/router/sharding/context_builder.rs index d52edcec..1f398f27 100644 --- a/pgdog/src/frontend/router/sharding/context_builder.rs +++ b/pgdog/src/frontend/router/sharding/context_builder.rs @@ -211,6 +211,7 @@ mod test { ..Default::default() }], vec![], + false, ), ..Default::default() }; @@ -241,6 +242,7 @@ mod test { ..Default::default() }], vec![], + false, ), ..Default::default() }; @@ -272,6 +274,7 @@ mod test { ..Default::default() }], vec![], + false, ), ..Default::default() };