Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions pgdog-config/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,46 @@ impl Config {
}
}

let databases = self
.databases
.iter()
.map(|database| database.name.clone())
.collect::<HashSet<_>>();

// Automatically configure system catalogs
// as omnisharded.
if self.general.system_catalogs_omnisharded {
for database in databases {
let entry = tables.entry(database).or_insert_with(Vec::new);

for table in [
"pg_class",
"pg_attribute",
"pg_attrdef",
"pg_index",
"pg_constraint",
"pg_namespace",
"pg_database",
"pg_tablespace",
"pg_type",
"pg_proc",
"pg_operator",
"pg_cast",
"pg_enum",
"pg_range",
"pg_authid",
"pg_am",
] {
if entry.iter().find(|t| t.name == table).is_none() {
entry.push(OmnishardedTable {
name: table.to_string(),
sticky_routing: true,
});
}
}
}
}

tables
}

Expand Down Expand Up @@ -725,6 +765,7 @@ password = "users_admin_password"
[general]
host = "0.0.0.0"
port = 6432
system_catalogs_omnisharded = false

[[databases]]
name = "db1"
Expand Down Expand Up @@ -772,4 +813,66 @@ tables = ["table_x"]
assert_eq!(db2_tables[0].name, "table_x");
assert!(!db2_tables[0].sticky_routing);
}

#[test]
fn test_omnisharded_tables_system_catalogs() {
// Test with system_catalogs_omnisharded = true
let source_enabled = r#"
[general]
host = "0.0.0.0"
port = 6432
system_catalogs_omnisharded = true

[[databases]]
name = "db1"
host = "127.0.0.1"
port = 5432

[[omnisharded_tables]]
database = "db1"
tables = ["my_table"]
"#;

let config: Config = toml::from_str(source_enabled).unwrap();
let tables = config.omnisharded_tables();
let db1_tables = tables.get("db1").unwrap();

// Should include my_table plus system catalogs
assert!(db1_tables.iter().any(|t| t.name == "my_table"));
assert!(db1_tables.iter().any(|t| t.name == "pg_class"));
assert!(db1_tables.iter().any(|t| t.name == "pg_attribute"));
assert!(db1_tables.iter().any(|t| t.name == "pg_namespace"));
assert!(db1_tables.iter().any(|t| t.name == "pg_type"));

// System catalogs should have sticky_routing = true
let pg_class = db1_tables.iter().find(|t| t.name == "pg_class").unwrap();
assert!(pg_class.sticky_routing);

// Test with system_catalogs_omnisharded = false
let source_disabled = r#"
[general]
host = "0.0.0.0"
port = 6432
system_catalogs_omnisharded = false

[[databases]]
name = "db1"
host = "127.0.0.1"
port = 5432

[[omnisharded_tables]]
database = "db1"
tables = ["my_table"]
"#;

let config: Config = toml::from_str(source_disabled).unwrap();
let tables = config.omnisharded_tables();
let db1_tables = tables.get("db1").unwrap();

// Should only include my_table, no system catalogs
assert_eq!(db1_tables.len(), 1);
assert_eq!(db1_tables[0].name, "my_table");
assert!(!db1_tables.iter().any(|t| t.name == "pg_class"));
assert!(!db1_tables.iter().any(|t| t.name == "pg_attribute"));
}
}
8 changes: 8 additions & 0 deletions pgdog-config/src/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ pub struct General {
/// Minimum ID for unique ID generator.
#[serde(default)]
pub unique_id_min: u64,
/// System catalogs are omnisharded?
#[serde(default = "General::default_system_catalogs_omnisharded")]
pub system_catalogs_omnisharded: bool,
}

impl Default for General {
Expand Down Expand Up @@ -262,6 +265,7 @@ impl Default for General {
lsn_check_timeout: Self::lsn_check_timeout(),
lsn_check_delay: Self::lsn_check_delay(),
unique_id_min: u64::default(),
system_catalogs_omnisharded: Self::default_system_catalogs_omnisharded(),
}
}
}
Expand Down Expand Up @@ -421,6 +425,10 @@ impl General {
Self::env_or_default("PGDOG_SHUTDOWN_TIMEOUT", 60_000)
}

fn default_system_catalogs_omnisharded() -> bool {
Self::env_or_default("PGDOG_SYSTEM_CATALOGS_OMNISHARDED", true)
}

fn default_shutdown_termination_timeout() -> Option<u64> {
Self::env_option("PGDOG_SHUTDOWN_TERMINATION_TIMEOUT")
}
Expand Down
Loading