Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions integration/rust/tests/sqlx/unique_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ async fn test_unique_id_uniqueness() {
/// Test that pgdog.unique_id() PL/pgSQL function produces IDs with the same
/// bit layout as Rust's unique_id.rs implementation.
#[tokio::test]
#[ignore]
async fn test_unique_id_bit_layout_matches_rust() {
// Constants from Rust unique_id.rs - these must match the SQL implementation
const SEQUENCE_BITS: u64 = 12;
Expand Down
4 changes: 4 additions & 0 deletions pgdog-config/src/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ pub struct General {
/// System catalogs are omnisharded?
#[serde(default = "General::default_system_catalogs_omnisharded")]
pub system_catalogs_omnisharded: bool,
/// Omnisharded queries are sticky by default.
#[serde(default)]
pub omnisharded_sticky: bool,
}

impl Default for General {
Expand Down Expand Up @@ -266,6 +269,7 @@ impl Default for General {
lsn_check_delay: Self::lsn_check_delay(),
unique_id_min: u64::default(),
system_catalogs_omnisharded: Self::default_system_catalogs_omnisharded(),
omnisharded_sticky: bool::default(),
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion pgdog/src/backend/databases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,11 @@ pub(crate) fn new_pool(
.get(&user.database)
.cloned()
.unwrap_or(vec![]);
let sharded_tables = ShardedTables::new(sharded_tables, omnisharded_tables);
let sharded_tables = ShardedTables::new(
sharded_tables,
omnisharded_tables,
general.omnisharded_sticky,
);
let sharded_schemas = ShardedSchemas::new(sharded_schemas);

let cluster_config = ClusterConfig::new(
Expand Down
1 change: 1 addition & 0 deletions pgdog/src/backend/pool/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ mod test {
sticky_routing: true,
},
],
config.config.general.omnisharded_sticky,
),
sharded_schemas: ShardedSchemas::new(vec![
ShardedSchema {
Expand Down
14 changes: 12 additions & 2 deletions pgdog/src/backend/replication/sharded_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ struct Inner {
/// across all tables, i.e., 3 tables with the same data type
/// and list/range/hash function.
common_mapping: Option<CommonMapping>,
omnisharded_sticky: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -46,12 +47,16 @@ impl Default for ShardedTables {

impl From<&[ShardedTable]> for ShardedTables {
fn from(value: &[ShardedTable]) -> Self {
Self::new(value.to_vec(), vec![])
Self::new(value.to_vec(), vec![], false)
}
}

impl ShardedTables {
pub fn new(tables: Vec<ShardedTable>, omnisharded_tables: Vec<OmnishardedTable>) -> Self {
pub fn new(
tables: Vec<ShardedTable>,
omnisharded_tables: Vec<OmnishardedTable>,
omnisharded_sticky: bool,
) -> Self {
let mut common_mapping = HashSet::new();
for table in &tables {
common_mapping.insert((
Expand Down Expand Up @@ -79,6 +84,7 @@ impl ShardedTables {
.map(|table| (table.name, table.sticky_routing))
.collect(),
common_mapping,
omnisharded_sticky,
}),
}
}
Expand All @@ -95,6 +101,10 @@ impl ShardedTables {
self.omnishards().get(name).cloned()
}

pub fn is_omnisharded_sticky_default(&self) -> bool {
self.inner.omnisharded_sticky
}

/// The deployment has only one sharded table.
pub fn common_mapping(&self) -> &Option<CommonMapping> {
&self.inner.common_mapping
Expand Down
158 changes: 79 additions & 79 deletions pgdog/src/backend/schema/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -322,82 +322,82 @@ $body$ LANGUAGE plpgsql;
-- Bit allocation: 41 timestamp + 10 node + 12 sequence = 63 bits (keeps sign bit clear)
-- The sequence stores (elapsed_ms << 12) | sequence_within_ms, allowing
-- automatic reset of the sequence counter when the millisecond changes.
CREATE SEQUENCE IF NOT EXISTS pgdog.unique_id_seq;

CREATE OR REPLACE FUNCTION pgdog.unique_id(id_offset BIGINT DEFAULT 0) RETURNS BIGINT AS $body$
DECLARE
sequence_bits CONSTANT INTEGER := 12;
node_bits CONSTANT INTEGER := 10;
max_node_id CONSTANT INTEGER := (1 << node_bits) - 1; -- 1023
max_sequence CONSTANT INTEGER := (1 << sequence_bits) - 1; -- 4095
max_timestamp CONSTANT BIGINT := (1::bigint << 41) - 1;
pgdog_epoch CONSTANT BIGINT := 1764184395000; -- Wednesday, November 26, 2025 11:13:15 AM GMT-08:00
node_shift CONSTANT INTEGER := sequence_bits; -- 12
timestamp_shift CONSTANT INTEGER := sequence_bits + node_bits; -- 22

node_id INTEGER;
now_ms BIGINT;
elapsed BIGINT;
min_combined BIGINT;
combined_seq BIGINT;
seq INTEGER;
timestamp_part BIGINT;
node_part BIGINT;
base_id BIGINT;
BEGIN
-- Get node_id from pgdog.config.shard
SELECT pgdog.config.shard INTO node_id FROM pgdog.config;

IF node_id IS NULL THEN
RAISE EXCEPTION 'pgdog.config.shard not set';
END IF;

IF node_id < 0 OR node_id > max_node_id THEN
RAISE EXCEPTION 'shard must be between 0 and %', max_node_id;
END IF;

LOOP
-- Get next combined sequence value
combined_seq := nextval('pgdog.unique_id_seq');

-- Get current time in milliseconds since Unix epoch
now_ms := (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint;
elapsed := now_ms - pgdog_epoch;

IF elapsed < 0 THEN
RAISE EXCEPTION 'Clock is before PgDog epoch (November 26, 2025)';
END IF;

-- Minimum valid combined value for current millisecond
min_combined := elapsed << 12;

-- If sequence is at or ahead of current time, we're good
IF combined_seq >= min_combined THEN
EXIT;
END IF;

-- Sequence is behind current time, advance it
PERFORM setval('pgdog.unique_id_seq', min_combined, false);
END LOOP;

-- Decompose the combined sequence value
seq := (combined_seq & max_sequence)::integer;
elapsed := combined_seq >> 12;

IF elapsed > max_timestamp THEN
RAISE EXCEPTION 'Timestamp overflow: % > %', elapsed, max_timestamp;
END IF;

-- Compose the ID: timestamp | node | sequence
timestamp_part := elapsed << timestamp_shift;
node_part := node_id::bigint << node_shift;
base_id := timestamp_part | node_part | seq;

RETURN base_id + id_offset;
END;
$body$ LANGUAGE plpgsql;

GRANT USAGE ON SEQUENCE pgdog.unique_id_seq TO PUBLIC;

-- Allow functions to be executed by anyone.
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA pgdog TO PUBLIC;
-- CREATE SEQUENCE IF NOT EXISTS pgdog.unique_id_seq;

-- CREATE OR REPLACE FUNCTION pgdog.unique_id(id_offset BIGINT DEFAULT 0) RETURNS BIGINT AS $body$
-- DECLARE
-- sequence_bits CONSTANT INTEGER := 12;
-- node_bits CONSTANT INTEGER := 10;
-- max_node_id CONSTANT INTEGER := (1 << node_bits) - 1; -- 1023
-- max_sequence CONSTANT INTEGER := (1 << sequence_bits) - 1; -- 4095
-- max_timestamp CONSTANT BIGINT := (1::bigint << 41) - 1;
-- pgdog_epoch CONSTANT BIGINT := 1764184395000; -- Wednesday, November 26, 2025 11:13:15 AM GMT-08:00
-- node_shift CONSTANT INTEGER := sequence_bits; -- 12
-- timestamp_shift CONSTANT INTEGER := sequence_bits + node_bits; -- 22

-- node_id INTEGER;
-- now_ms BIGINT;
-- elapsed BIGINT;
-- min_combined BIGINT;
-- combined_seq BIGINT;
-- seq INTEGER;
-- timestamp_part BIGINT;
-- node_part BIGINT;
-- base_id BIGINT;
-- BEGIN
-- -- Get node_id from pgdog.config.shard
-- SELECT pgdog.config.shard INTO node_id FROM pgdog.config;

-- IF node_id IS NULL THEN
-- RAISE EXCEPTION 'pgdog.config.shard not set';
-- END IF;

-- IF node_id < 0 OR node_id > max_node_id THEN
-- RAISE EXCEPTION 'shard must be between 0 and %', max_node_id;
-- END IF;

-- LOOP
-- -- Get next combined sequence value
-- combined_seq := nextval('pgdog.unique_id_seq');

-- -- Get current time in milliseconds since Unix epoch
-- now_ms := (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint;
-- elapsed := now_ms - pgdog_epoch;

-- IF elapsed < 0 THEN
-- RAISE EXCEPTION 'Clock is before PgDog epoch (November 26, 2025)';
-- END IF;

-- -- Minimum valid combined value for current millisecond
-- min_combined := elapsed << 12;

-- -- If sequence is at or ahead of current time, we're good
-- IF combined_seq >= min_combined THEN
-- EXIT;
-- END IF;

-- -- Sequence is behind current time, advance it
-- PERFORM setval('pgdog.unique_id_seq', min_combined, false);
-- END LOOP;

-- -- Decompose the combined sequence value
-- seq := (combined_seq & max_sequence)::integer;
-- elapsed := combined_seq >> 12;

-- IF elapsed > max_timestamp THEN
-- RAISE EXCEPTION 'Timestamp overflow: % > %', elapsed, max_timestamp;
-- END IF;

-- -- Compose the ID: timestamp | node | sequence
-- timestamp_part := elapsed << timestamp_shift;
-- node_part := node_id::bigint << node_shift;
-- base_id := timestamp_part | node_part | seq;

-- RETURN base_id + id_offset;
-- END;
-- $body$ LANGUAGE plpgsql;

-- GRANT USAGE ON SEQUENCE pgdog.unique_id_seq TO PUBLIC;

-- -- Allow functions to be executed by anyone.
-- GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA pgdog TO PUBLIC;
12 changes: 6 additions & 6 deletions pgdog/src/frontend/router/parser/comment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ mod tests {

let schema = ShardingSchema {
shards: 2,
tables: ShardedTables::new(vec![], vec![]),
tables: ShardedTables::new(vec![], vec![], false),
..Default::default()
};

Expand All @@ -174,7 +174,7 @@ mod tests {

let schema = ShardingSchema {
shards: 3,
tables: ShardedTables::new(vec![], vec![]),
tables: ShardedTables::new(vec![], vec![], false),
..Default::default()
};

Expand All @@ -190,7 +190,7 @@ mod tests {

let schema = ShardingSchema {
shards: 2,
tables: ShardedTables::new(vec![], vec![]),
tables: ShardedTables::new(vec![], vec![], false),
..Default::default()
};

Expand All @@ -205,7 +205,7 @@ mod tests {

let schema = ShardingSchema {
shards: 2,
tables: ShardedTables::new(vec![], vec![]),
tables: ShardedTables::new(vec![], vec![], false),
..Default::default()
};

Expand All @@ -220,7 +220,7 @@ mod tests {

let schema = ShardingSchema {
shards: 2,
tables: ShardedTables::new(vec![], vec![]),
tables: ShardedTables::new(vec![], vec![], false),
..Default::default()
};

Expand All @@ -244,7 +244,7 @@ mod tests {

let schema = ShardingSchema {
shards: 2,
tables: ShardedTables::new(vec![], vec![]),
tables: ShardedTables::new(vec![], vec![], false),
schemas: ShardedSchemas::new(vec![sales_schema]),
..Default::default()
};
Expand Down
2 changes: 2 additions & 0 deletions pgdog/src/frontend/router/parser/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ mod test {
},
],
vec![],
false,
),
..Default::default()
};
Expand Down Expand Up @@ -343,6 +344,7 @@ mod test {
..Default::default()
}],
vec![],
false,
),
..Default::default()
};
Expand Down
7 changes: 6 additions & 1 deletion pgdog/src/frontend/router/parser/query/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,12 @@ impl QueryParser {
== Some(true)
});

let (rr_index, explain) = if sticky {
let (rr_index, explain) = if sticky
|| context
.sharding_schema
.tables()
.is_omnisharded_sticky_default()
{
(context.router_context.sticky.omni_index, "sticky")
} else {
(round_robin::next(), "round robin")
Expand Down
Loading
Loading