Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions integration/go/go_pgx/pg_tests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func executeTimeoutTest(t *testing.T) {
c := make(chan int, 1)

go func() {
err := pgSleepOneSecond(conn, ctx)
err := pgSleepTwoSecond(conn, ctx)
assert.NotNil(t, err)

defer conn.Close(context.Background())
Expand All @@ -240,8 +240,8 @@ func executeTimeoutTest(t *testing.T) {
}

// Sleep for 1 second.
func pgSleepOneSecond(conn *pgx.Conn, ctx context.Context) (err error) {
_, err = conn.Exec(ctx, "SELECT pg_sleep(1)")
func pgSleepTwoSecond(conn *pgx.Conn, ctx context.Context) (err error) {
_, err = conn.Exec(ctx, "SELECT pg_sleep(2)")
return err
}

Expand Down
5 changes: 3 additions & 2 deletions integration/go/go_pgx/sharded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,11 @@ func TestShardedTwoPc(t *testing.T) {
assert.NoError(t, err)
}

// +3 is for schema sync
assertShowField(t, "SHOW STATS", "total_xact_2pc_count", 200, "pgdog_2pc", "pgdog_sharded", 0, "primary")
assertShowField(t, "SHOW STATS", "total_xact_2pc_count", 200, "pgdog_2pc", "pgdog_sharded", 1, "primary")
assertShowField(t, "SHOW STATS", "total_xact_count", 401, "pgdog_2pc", "pgdog_sharded", 0, "primary") // PREPARE, COMMIT for each transaction + TRUNCATE
assertShowField(t, "SHOW STATS", "total_xact_count", 401, "pgdog_2pc", "pgdog_sharded", 1, "primary")
assertShowField(t, "SHOW STATS", "total_xact_count", 401+3, "pgdog_2pc", "pgdog_sharded", 0, "primary") // PREPARE, COMMIT for each transaction + TRUNCATE
assertShowField(t, "SHOW STATS", "total_xact_count", 401+3, "pgdog_2pc", "pgdog_sharded", 1, "primary")

for i := range 200 {
rows, err := conn.Query(
Expand Down
19 changes: 14 additions & 5 deletions integration/rust/tests/integration/avg.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use rust::setup::connections_sqlx;
use rust::setup::{admin_sqlx, connections_sqlx};
use sqlx::{Connection, Executor, PgConnection, Row};

#[tokio::test]
Expand All @@ -17,12 +17,15 @@ async fn avg_merges_with_helper_count() -> Result<(), Box<dyn std::error::Error>

for shard in [0, 1] {
let comment = format!(
"/* pgdog_shard: {} */ CREATE TABLE avg_reduce_test(price DOUBLE PRECISION)",
"/* pgdog_shard: {} */ CREATE TABLE avg_reduce_test(price DOUBLE PRECISION, customer_id BIGINT)",
shard
);
sharded.execute(comment.as_str()).await?;
}

// Make sure sharded table is loaded in schema.
admin_sqlx().await.execute("RELOAD").await?;

// Insert data on each shard so the query spans multiple shards.
sharded
.execute("/* pgdog_shard: 0 */ INSERT INTO avg_reduce_test(price) VALUES (10.0), (14.0)")
Expand Down Expand Up @@ -73,12 +76,14 @@ async fn avg_without_helper_should_still_merge() -> Result<(), Box<dyn std::erro

for shard in [0, 1] {
let comment = format!(
"/* pgdog_shard: {} */ CREATE TABLE avg_rewrite_expectation(price DOUBLE PRECISION)",
"/* pgdog_shard: {} */ CREATE TABLE avg_rewrite_expectation(price DOUBLE PRECISION, customer_id BIGINT)",
shard
);
sharded.execute(comment.as_str()).await?;
}

admin_sqlx().await.execute("RELOAD").await?;

sharded
.execute(
"/* pgdog_shard: 0 */ INSERT INTO avg_rewrite_expectation(price) VALUES (10.0), (14.0)",
Expand Down Expand Up @@ -145,12 +150,14 @@ async fn avg_multiple_columns_should_merge() -> Result<(), Box<dyn std::error::E

for shard in [0, 1] {
let comment = format!(
"/* pgdog_shard: {} */ CREATE TABLE avg_multi_column(price DOUBLE PRECISION, discount DOUBLE PRECISION)",
"/* pgdog_shard: {} */ CREATE TABLE avg_multi_column(price DOUBLE PRECISION, discount DOUBLE PRECISION, customer_id BIGINT)",
shard
);
sharded.execute(comment.as_str()).await?;
}

admin_sqlx().await.execute("RELOAD").await?;

sharded
.execute(
"/* pgdog_shard: 0 */ INSERT INTO avg_multi_column(price, discount) VALUES (10.0, 1.0), (14.0, 3.0)",
Expand Down Expand Up @@ -231,12 +238,14 @@ async fn avg_prepared_statement_should_merge() -> Result<(), Box<dyn std::error:

for shard in [0, 1] {
let comment = format!(
"/* pgdog_shard: {} */ CREATE TABLE avg_prepared_params(price DOUBLE PRECISION)",
"/* pgdog_shard: {} */ CREATE TABLE avg_prepared_params(price DOUBLE PRECISION, customer_id BIGINT)",
shard
);
sharded.execute(comment.as_str()).await?;
}

admin_sqlx().await.execute("RELOAD").await?;

sharded
.execute(
"/* pgdog_shard: 0 */ INSERT INTO avg_prepared_params(price) VALUES (10.0), (14.0)",
Expand Down
7 changes: 5 additions & 2 deletions integration/rust/tests/integration/explain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use rust::setup::connections_tokio;
use rust::setup::{admin_sqlx, connections_tokio};
use sqlx::Executor;
use tokio_postgres::SimpleQueryMessage;

#[tokio::test]
Expand All @@ -16,12 +17,14 @@ async fn explain_routing_annotations_surface() -> Result<(), Box<dyn std::error:

for shard in [0, 1] {
let create = format!(
"/* pgdog_shard: {} */ CREATE TABLE explain_avg_test(price DOUBLE PRECISION)",
"/* pgdog_shard: {} */ CREATE TABLE explain_avg_test(price DOUBLE PRECISION, customer_id BIGINT)",
shard
);
sharded.simple_query(create.as_str()).await?;
}

admin_sqlx().await.execute("RELOAD").await?;

sharded
.simple_query(
"/* pgdog_shard: 0 */ INSERT INTO explain_avg_test(price) VALUES (10.0), (14.0)",
Expand Down
4 changes: 3 additions & 1 deletion integration/rust/tests/integration/per_stmt_routing.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use rust::setup::connections_sqlx;
use rust::setup::{admin_sqlx, connections_sqlx};
use sqlx::{Acquire, Executor, Row};

#[tokio::test]
Expand All @@ -12,6 +12,8 @@ async fn per_stmt_routing() -> Result<(), Box<dyn std::error::Error>> {
)
.await?;

admin_sqlx().await.execute("RELOAD").await?;

sharded.execute("TRUNCATE TABLE per_stmt_routing").await?;

for i in 0..50 {
Expand Down
28 changes: 19 additions & 9 deletions integration/rust/tests/integration/shard_consistency.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use rust::setup::connections_sqlx;
use rust::setup::{admin_sqlx, connections_sqlx};
use sqlx::{Executor, Row};

#[tokio::test]
Expand All @@ -15,14 +15,16 @@ async fn shard_consistency_validator() -> Result<(), Box<dyn std::error::Error>>
// Create different table schemas on each shard to trigger validator errors
// Shard 0: table with 2 columns (id, name)
sharded
.execute("/* pgdog_shard: 0 */ CREATE TABLE shard_test (id BIGINT PRIMARY KEY, name VARCHAR(100))")
.execute("/* pgdog_shard: 0 */ CREATE TABLE shard_test (id BIGINT PRIMARY KEY, name VARCHAR(100), customer_id BIGINT)")
.await?;

// Shard 1: table with 3 columns (id, name, extra) - different column count
sharded
.execute("/* pgdog_shard: 1 */ CREATE TABLE shard_test (id BIGINT PRIMARY KEY, name VARCHAR(100), extra TEXT)")
.execute("/* pgdog_shard: 1 */ CREATE TABLE shard_test (id BIGINT PRIMARY KEY, name VARCHAR(100), extra TEXT, customer_id BIGINT)")
.await?;

admin_sqlx().await.execute("RELOAD").await?;

// Insert some test data on each shard
sharded
.execute("/* pgdog_shard: 0 */ INSERT INTO shard_test (id, name) VALUES (1, 'shard0_row1'), (2, 'shard0_row2')")
Expand Down Expand Up @@ -76,14 +78,16 @@ async fn shard_consistency_validator_column_names() -> Result<(), Box<dyn std::e
// Create tables with same column count but different column names
// Shard 0: columns named (id, name)
sharded
.execute("/* pgdog_shard: 0 */ CREATE TABLE shard_name_test (id BIGINT PRIMARY KEY, name VARCHAR(100))")
.execute("/* pgdog_shard: 0 */ CREATE TABLE shard_name_test (id BIGINT PRIMARY KEY, name VARCHAR(100), customer_id BIGINT)")
.await?;

// Shard 1: columns named (id, username) - different column name
sharded
.execute("/* pgdog_shard: 1 */ CREATE TABLE shard_name_test (id BIGINT PRIMARY KEY, username VARCHAR(100))")
.execute("/* pgdog_shard: 1 */ CREATE TABLE shard_name_test (id BIGINT PRIMARY KEY, username VARCHAR(100), customer_id BIGINT)")
.await?;

admin_sqlx().await.execute("RELOAD").await?;

// Insert test data
sharded
.execute("/* pgdog_shard: 0 */ INSERT INTO shard_name_test (id, name) VALUES (1, 'test1')")
Expand Down Expand Up @@ -138,9 +142,11 @@ async fn shard_consistency_validator_success() -> Result<(), Box<dyn std::error:

// Create identical table schemas on both shards
sharded
.execute("CREATE TABLE shard_consistent_test (id BIGINT PRIMARY KEY, name VARCHAR(100))")
.execute("CREATE TABLE shard_consistent_test (id BIGINT PRIMARY KEY, name VARCHAR(100), customer_id BIGINT)")
.await?;

admin_sqlx().await.execute("RELOAD").await?;

// Insert test data
sharded
.execute("/* pgdog_shard: 0 */ INSERT INTO shard_consistent_test (id, name) VALUES (1, 'shard0_data'), (2, 'shard0_more')")
Expand Down Expand Up @@ -190,9 +196,11 @@ async fn shard_consistency_data_row_validator_prepared_statement()
// Create tables with same schema but we'll query them differently to trigger DataRow validation
// Both tables have same structure so RowDescription will match initially
sharded
.execute("CREATE TABLE shard_datarow_test (id BIGINT PRIMARY KEY, name VARCHAR(100), extra TEXT DEFAULT 'default')")
.execute("CREATE TABLE shard_datarow_test (id BIGINT PRIMARY KEY, name VARCHAR(100), extra TEXT DEFAULT 'default', customer_id BIGINT)")
.await?;

admin_sqlx().await.execute("RELOAD").await?;

// Insert test data
sharded
.execute("/* pgdog_shard: 0 */ INSERT INTO shard_datarow_test (id, name) VALUES (1, 'test1'), (2, 'test2')")
Expand All @@ -208,13 +216,15 @@ async fn shard_consistency_data_row_validator_prepared_statement()

// Actually, let's create a simpler test - use views with different column counts
sharded
.execute("/* pgdog_shard: 0 */ CREATE VIEW datarow_view AS SELECT id, name FROM shard_datarow_test")
.execute("/* pgdog_shard: 0 */ CREATE VIEW datarow_view AS SELECT id, name, customer_id FROM shard_datarow_test")
.await?;

sharded
.execute("/* pgdog_shard: 1 */ CREATE VIEW datarow_view AS SELECT id, name, extra FROM shard_datarow_test")
.execute("/* pgdog_shard: 1 */ CREATE VIEW datarow_view AS SELECT id, name, extra, customer_id FROM shard_datarow_test")
.await?;

admin_sqlx().await.execute("RELOAD").await?;

// Now prepare a statement against the views
let result = sharded
.prepare("SELECT * FROM datarow_view WHERE id > $1 ORDER BY id")
Expand Down
28 changes: 20 additions & 8 deletions integration/rust/tests/integration/stddev.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeSet;

use ordered_float::OrderedFloat;
use rust::setup::{connections_sqlx, connections_tokio};
use rust::setup::{admin_sqlx, connections_sqlx, connections_tokio};
use sqlx::{Connection, Executor, PgConnection, Row, postgres::PgPool};

const SHARD_URLS: [&str; 2] = [
Expand Down Expand Up @@ -55,7 +55,7 @@ async fn stddev_pop_merges_with_helpers() -> Result<(), Box<dyn std::error::Erro
reset_table(
&sharded,
"stddev_pop_reduce_test",
"(price DOUBLE PRECISION)",
"(price DOUBLE PRECISION, customer_id BIGINT)",
)
.await?;

Expand Down Expand Up @@ -106,7 +106,7 @@ async fn stddev_samp_aliases_should_merge() -> Result<(), Box<dyn std::error::Er
reset_table(
&sharded,
"stddev_sample_reduce_test",
"(price DOUBLE PRECISION)",
"(price DOUBLE PRECISION, customer_id BIGINT)",
)
.await?;

Expand Down Expand Up @@ -157,7 +157,12 @@ async fn variance_variants_should_merge() -> Result<(), Box<dyn std::error::Erro
let conns = connections_sqlx().await;
let sharded = conns.get(1).cloned().unwrap();

reset_table(&sharded, "variance_reduce_test", "(price DOUBLE PRECISION)").await?;
reset_table(
&sharded,
"variance_reduce_test",
"(price DOUBLE PRECISION, customer_id BIGINT)",
)
.await?;

seed_stat_data(
&sharded,
Expand Down Expand Up @@ -222,7 +227,7 @@ async fn stddev_multiple_columns_should_merge() -> Result<(), Box<dyn std::error
reset_table(
&sharded,
"stddev_multi_column",
"(price DOUBLE PRECISION, discount DOUBLE PRECISION)",
"(price DOUBLE PRECISION, discount DOUBLE PRECISION, customer_id BIGINT)",
)
.await?;

Expand Down Expand Up @@ -290,7 +295,7 @@ async fn stddev_prepared_statement_should_merge() -> Result<(), Box<dyn std::err
reset_table(
&sharded,
"stddev_prepared_params",
"(price DOUBLE PRECISION)",
"(price DOUBLE PRECISION, customer_id BIGINT)",
)
.await?;

Expand Down Expand Up @@ -358,7 +363,7 @@ async fn stddev_distinct_should_error_until_supported() -> Result<(), Box<dyn st
reset_table(
&sharded,
"stddev_distinct_error",
"(price DOUBLE PRECISION)",
"(price DOUBLE PRECISION, customer_id BIGINT)",
)
.await?;

Expand Down Expand Up @@ -396,7 +401,12 @@ async fn stddev_distinct_future_expectation() -> Result<(), Box<dyn std::error::
let conns = connections_sqlx().await;
let sharded = conns.get(1).cloned().unwrap();

reset_table(&sharded, "stddev_distinct_test", "(price DOUBLE PRECISION)").await?;
reset_table(
&sharded,
"stddev_distinct_test",
"(price DOUBLE PRECISION, customer_id BIGINT)",
)
.await?;

seed_stat_data(
&sharded,
Expand Down Expand Up @@ -456,6 +466,8 @@ async fn reset_table(pool: &PgPool, table: &str, schema: &str) -> Result<(), sql
pool.execute(create_stmt.as_str()).await?;
}

admin_sqlx().await.execute("RELOAD").await?;

Ok(())
}

Expand Down
Loading
Loading