diff --git a/cli.sh b/cli.sh index c568f2870..f871de793 100755 --- a/cli.sh +++ b/cli.sh @@ -3,6 +3,8 @@ # Dev CLI. # +set -ex + # Connect to the admin database. function admin() { PGPASSWORD=pgdog psql -h 127.0.0.1 -p 6432 -U admin admin @@ -15,7 +17,7 @@ function admin() { # - protocol: simple|extended|prepared # function bench() { - PGPASSWORD=pgdog pgbench -h 127.0.0.1 -p 6432 -U pgdog pgdog --protocol ${2:-simple} -t 100000 -c 10 -P 1 -S + PGPASSWORD=pgdog pgbench -h 127.0.0.1 -p 6432 -U pgdog pgdog --protocol ${1:-simple} -t 100000000 -c 10 -P 1 -S } function bench_init() { diff --git a/pgdog-stats/src/server.rs b/pgdog-stats/src/server.rs index ec5472a6b..5b9abae07 100644 --- a/pgdog-stats/src/server.rs +++ b/pgdog-stats/src/server.rs @@ -95,6 +95,8 @@ pub struct Stats { pub last_checkout: Counts, pub pool_id: u64, pub memory: MemoryStats, + pub last_sent: u8, + pub last_received: u8, } impl Default for Stats { @@ -106,6 +108,8 @@ impl Default for Stats { last_checkout: Counts::default(), pool_id: 0, memory: MemoryStats::default(), + last_sent: 0, + last_received: 0, } } } diff --git a/pgdog/src/admin/show_server_memory.rs b/pgdog/src/admin/show_server_memory.rs index d6f6168d3..1f58a0f6c 100644 --- a/pgdog/src/admin/show_server_memory.rs +++ b/pgdog/src/admin/show_server_memory.rs @@ -38,15 +38,14 @@ impl Command for ShowServerMemory { let stats = stats(); for (_, server) in stats { let mut row = DataRow::new(); - let stats = server.stats; - let memory = &stats.memory; + let memory = &server.stats.memory; - row.add(stats.pool_id as i64) + row.add(server.stats.pool_id as i64) .add(server.addr.database_name.as_str()) .add(server.addr.user.as_str()) .add(server.addr.host.as_str()) .add(server.addr.port as i64) - .add(stats.id.pid as i64) + .add(server.stats.id.pid as i64) .add(memory.buffer.reallocs as i64) .add(memory.buffer.reclaims as i64) .add(memory.buffer.bytes_used as i64) diff --git a/pgdog/src/admin/show_servers.rs b/pgdog/src/admin/show_servers.rs index 76dd026dc..d2c77fab8 100644 --- a/pgdog/src/admin/show_servers.rs +++ b/pgdog/src/admin/show_servers.rs @@ -53,7 +53,7 @@ impl Command for ShowServers { Field::text("connect_time"), Field::text("request_time"), Field::numeric("remote_pid"), - // Field::bigint("client_id"), + Field::bigint("client_id"), Field::numeric("transactions"), Field::numeric("queries"), Field::numeric("rollbacks"), @@ -62,6 +62,8 @@ impl Command for ShowServers { Field::numeric("errors"), Field::numeric("bytes_received"), Field::numeric("bytes_sent"), + Field::text("last_sent"), + Field::text("last_received"), Field::numeric("age"), Field::text("application_name"), ], @@ -78,32 +80,45 @@ impl Command for ShowServers { let now_time = SystemTime::now(); for (_, server) in stats { - let stats = server.stats; - let age = now.duration_since(stats.created_at); - let request_age = now.duration_since(stats.last_used); + let age = now.duration_since(server.stats.created_at); + let request_age = now.duration_since(server.stats.last_used); let request_time = now_time - request_age; let dr = self .row .clone() - .add("pool_id", stats.pool_id) + .add("pool_id", server.stats.pool_id) .add("database", server.addr.database_name) .add("user", server.addr.user) .add("addr", server.addr.host.as_str()) .add("port", server.addr.port.to_string()) - .add("state", stats.state.to_string()) - .add("connect_time", format_time(stats.created_at_time.into())) + .add("state", server.stats.state.to_string()) + .add( + "connect_time", + format_time(server.stats.created_at_time.into()), + ) .add("request_time", format_time(request_time.into())) - .add("remote_pid", stats.id.pid as i64) - // .add("client_id", stats.client_id.map(|client| client.pid as i64)) - .add("transactions", stats.total.transactions) - .add("queries", stats.total.queries) - .add("rollbacks", stats.total.rollbacks) - .add("prepared_statements", stats.total.prepared_statements) - .add("healthchecks", stats.total.healthchecks) - .add("errors", stats.total.errors) - .add("bytes_received", stats.total.bytes_received) - .add("bytes_sent", stats.total.bytes_sent) + .add("remote_pid", server.stats.id.pid as i64) + .add( + "client_id", + server.stats.client_id.map(|client| client.pid as i64), + ) + .add("transactions", server.stats.total.transactions) + .add("queries", server.stats.total.queries) + .add("rollbacks", server.stats.total.rollbacks) + .add( + "prepared_statements", + server.stats.total.prepared_statements, + ) + .add("healthchecks", server.stats.total.healthchecks) + .add("errors", server.stats.total.errors) + .add("bytes_received", server.stats.total.bytes_received) + .add("bytes_sent", server.stats.total.bytes_sent) + .add("last_sent", (server.stats.last_sent as char).to_string()) + .add( + "last_received", + (server.stats.last_received as char).to_string(), + ) .add("age", age.as_secs() as i64) .add("application_name", server.application_name.as_str()) .data_row(); diff --git a/pgdog/src/admin/show_stats.rs b/pgdog/src/admin/show_stats.rs index febd5df52..e1e8fc24f 100644 --- a/pgdog/src/admin/show_stats.rs +++ b/pgdog/src/admin/show_stats.rs @@ -1,5 +1,6 @@ //! SHOW STATS. use crate::backend::databases::databases; +use crate::util::millis; use super::prelude::*; @@ -83,17 +84,17 @@ impl Command for ShowStats { .add(stat.server_assignment_count) .add(stat.received) .add(stat.sent) - .add(stat.xact_time.as_millis() as u64) - .add(stat.idle_xact_time.as_millis() as u64) - .add(stat.query_time.as_millis() as u64) - .add(stat.wait_time.as_millis() as u64) + .add(millis(stat.xact_time)) + .add(millis(stat.idle_xact_time)) + .add(millis(stat.query_time)) + .add(millis(stat.wait_time)) .add(stat.parse_count) .add(stat.bind_count) .add(stat.close) .add(stat.errors) .add(stat.cleaned) .add(stat.rollbacks) - .add(stat.connect_time.as_millis() as u64) + .add(millis(stat.connect_time)) .add(stat.connect_count); } diff --git a/pgdog/src/backend/pool/connection/binding.rs b/pgdog/src/backend/pool/connection/binding.rs index 5e1168895..ab093c8be 100644 --- a/pgdog/src/backend/pool/connection/binding.rs +++ b/pgdog/src/backend/pool/connection/binding.rs @@ -238,14 +238,18 @@ impl Binding { Binding::Direct(Some(server)) => { debug!( "server is in \"{}\" state [{}]", - server.stats().state, + server.stats().get_state(), server.addr() ); - server.stats().state == state + server.stats().get_state() == state } Binding::MultiShard(servers, _) => servers.iter().all(|s| { - debug!("server is in \"{}\" state [{}]", s.stats().state, s.addr()); - s.stats().state == state + debug!( + "server is in \"{}\" state [{}]", + s.stats().get_state(), + s.addr() + ); + s.stats().get_state() == state }), _ => true, } diff --git a/pgdog/src/backend/pool/guard.rs b/pgdog/src/backend/pool/guard.rs index 8c005273f..0924ce9bf 100644 --- a/pgdog/src/backend/pool/guard.rs +++ b/pgdog/src/backend/pool/guard.rs @@ -96,7 +96,7 @@ impl Guard { } else { debug!( "[cleanup] no cleanup needed, server in \"{}\" state [{}]", - server.stats().state, + server.stats().get_state(), server.addr(), ); if let Err(err) = pool.checkin(server) { @@ -120,7 +120,7 @@ impl Guard { // Receive whatever data the client left before disconnecting. debug!( "[cleanup] draining data from \"{}\" server [{}]", - server.stats().state, + server.stats().get_state(), server.addr() ); server.drain().await?; @@ -138,7 +138,7 @@ impl Guard { if conn_recovery.can_rollback() { debug!( "[cleanup] rolling back server transaction, in \"{}\" state [{}]", - server.stats().state, + server.stats().get_state(), server.addr(), ); server.rollback().await?; @@ -152,7 +152,7 @@ impl Guard { debug!( "[cleanup] running {} cleanup queries, server in \"{}\" state [{}]", cleanup.len(), - server.stats().state, + server.stats().get_state(), server.addr() ); server.execute_batch(cleanup.queries()).await?; @@ -180,7 +180,7 @@ impl Guard { if sync_prepared { debug!( "[cleanup] syncing prepared statements, server in \"{}\" state [{}]", - server.stats().state, + server.stats().get_state(), server.addr() ); server.sync_prepared_statements().await?; @@ -510,7 +510,7 @@ mod test { .unwrap(); use crate::state::State; - assert_eq!(server.stats().state, State::ForceClose); + assert_eq!(server.stats().get_state(), State::ForceClose); assert!(server.needs_drain()); } @@ -556,7 +556,7 @@ mod test { .unwrap(); use crate::state::State; - assert_eq!(server.stats().state, State::ForceClose); + assert_eq!(server.stats().get_state(), State::ForceClose); assert!(server.needs_drain()); } @@ -669,7 +669,7 @@ mod test { .unwrap(); use crate::state::State; - assert_eq!(server.stats().state, State::ForceClose); + assert_eq!(server.stats().get_state(), State::ForceClose); assert!(server.in_transaction()); } @@ -730,7 +730,7 @@ mod test { .unwrap(); use crate::state::State; - assert_eq!(server.stats().state, State::ForceClose); + assert_eq!(server.stats().get_state(), State::ForceClose); assert!(server.needs_drain()); assert!(server.in_transaction()); } diff --git a/pgdog/src/backend/pool/inner.rs b/pgdog/src/backend/pool/inner.rs index f85458b48..975a742f3 100644 --- a/pgdog/src/backend/pool/inner.rs +++ b/pgdog/src/backend/pool/inner.rs @@ -293,7 +293,7 @@ impl Inner { let taken = std::mem::take(&mut self.taken); for conn in idle.iter_mut() { - conn.stats_mut().pool_id = destination.id(); + conn.stats_mut().set_pool_id(destination.id()); } (idle, taken) @@ -319,8 +319,8 @@ impl Inner { result.replenish = false; // Prevents deadlocks. if moved.id() != self.id { - server.stats_mut().pool_id = moved.id(); - server.stats_mut().update(); + server.stats_mut().set_pool_id(moved.id()); + server.stats().update(); moved.lock().maybe_check_in(server, now, stats)?; return Ok(result); } diff --git a/pgdog/src/backend/pool/pool_impl.rs b/pgdog/src/backend/pool/pool_impl.rs index fc645c7b8..ff71e7276 100644 --- a/pgdog/src/backend/pool/pool_impl.rs +++ b/pgdog/src/backend/pool/pool_impl.rs @@ -220,13 +220,15 @@ impl Pool { let now = if server.pooler_mode() == &PoolerMode::Session { Instant::now() } else { - server.stats().last_used + server.stats().last_used() }; let counts = { let stats = server.stats_mut(); - stats.client_id = None; - stats.reset_last_checkout() + stats.clear_client_id(); + let counts = stats.reset_last_checkout(); + stats.update(); + counts }; // Check everything and maybe check the connection diff --git a/pgdog/src/backend/pool/test/mod.rs b/pgdog/src/backend/pool/test/mod.rs index 6de94a438..981a3737e 100644 --- a/pgdog/src/backend/pool/test/mod.rs +++ b/pgdog/src/backend/pool/test/mod.rs @@ -440,7 +440,7 @@ async fn test_prepared_statements_limit() { || guard.prepared_statements_mut().contains("__pgdog_98") ); assert_eq!(guard.prepared_statements_mut().len(), 2); - assert_eq!(guard.stats().total.prepared_statements, 2); // stats are accurate. + assert_eq!(guard.stats().total().prepared_statements, 2); // stats are accurate. let pool = pool_with_prepared_capacity(100); @@ -468,14 +468,14 @@ async fn test_prepared_statements_limit() { let mut guard = pool.get(&Request::default()).await.unwrap(); assert!(guard.prepared_statements_mut().contains("__pgdog_99")); assert_eq!(guard.prepared_statements_mut().len(), 100); - assert_eq!(guard.stats().total.prepared_statements, 100); // stats are accurate. + assert_eq!(guard.stats().total().prepared_statements, 100); // stats are accurate. // Let's make sure Postgres agreees. guard.sync_prepared_statements().await.unwrap(); assert!(guard.prepared_statements_mut().contains("__pgdog_99")); assert_eq!(guard.prepared_statements_mut().len(), 100); - assert_eq!(guard.stats().total.prepared_statements, 100); // stats are accurate. + assert_eq!(guard.stats().total().prepared_statements, 100); // stats are accurate. } #[tokio::test] @@ -612,7 +612,7 @@ async fn test_move_conns_to() { assert_eq!(source.lock().total(), 0); let new_pool_id = destination.id(); for conn in destination.lock().idle_conns() { - assert_eq!(conn.stats().pool_id, new_pool_id); + assert_eq!(conn.stats().pool_id(), new_pool_id); } drop(conn2); diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index 64c9cfe2c..162e4477e 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -329,7 +329,7 @@ impl Server { for message in queue.into_iter().flatten() { match self.stream().send(message).await { - Ok(sent) => self.stats.send(sent), + Ok(sent) => self.stats.send(sent, message.code() as u8), Err(err) => { self.stats.state(State::Error); return Err(err.into()); @@ -381,7 +381,7 @@ impl Server { err, message.code(), self.prepared_statements.state(), - self.stats.state, + self.stats.get_state(), ); return Err(err); } @@ -395,7 +395,7 @@ impl Server { } }; - self.stats.receive(message.len()); + self.stats.receive(message.len(), message.code() as u8); match message.code() { 'Z' => { @@ -570,15 +570,15 @@ impl Server { /// Server can execute a query. pub fn in_sync(&self) -> bool { matches!( - self.stats().state, + self.stats().get_state(), State::Idle | State::IdleInTransaction | State::TransactionError ) } - /// Server is done executing all queries and isz + /// Server is done executing all queries and is /// not inside a transaction. pub fn can_check_in(&self) -> bool { - self.stats().state == State::Idle + self.stats().get_state() == State::Idle } /// Server hasn't sent all messages yet. @@ -599,7 +599,7 @@ impl Server { /// The server connection permanently failed. #[inline] pub fn error(&self) -> bool { - self.stats().state == State::Error + self.stats().get_state() == State::Error } /// Did the schema change and prepared statements are broken. @@ -620,7 +620,7 @@ impl Server { /// Close the connection, don't do any recovery. pub fn force_close(&self) -> bool { - self.stats().state == State::ForceClose || self.io_in_progress() + self.stats().get_state() == State::ForceClose || self.io_in_progress() } /// Server parameters. @@ -870,19 +870,19 @@ impl Server { /// How old this connection is. #[inline] pub fn age(&self, instant: Instant) -> Duration { - instant.duration_since(self.stats().created_at) + instant.duration_since(self.stats().created_at()) } /// How long this connection has been idle. #[inline] pub fn idle_for(&self, instant: Instant) -> Duration { - instant.duration_since(self.stats().last_used) + instant.duration_since(self.stats().last_used()) } /// How long has it been since the last connection healthcheck. #[inline] pub fn healthcheck_age(&self, instant: Instant) -> Duration { - if let Some(last_healthcheck) = self.stats().last_healthcheck { + if let Some(last_healthcheck) = self.stats().last_healthcheck() { instant.duration_since(last_healthcheck) } else { Duration::MAX @@ -983,7 +983,7 @@ impl Drop for Server { info!( "closing server connection [{}, state: {}, reason: {}]", self.addr, - self.stats.state, + self.stats.get_state(), self.disconnect_reason.take().unwrap_or_default(), ); @@ -1173,7 +1173,7 @@ pub mod test { assert!(server.done()); assert_eq!( - server.stats().total.idle_in_transaction_time, + server.stats().total().idle_in_transaction_time, Duration::ZERO ); } @@ -1561,7 +1561,7 @@ pub mod test { } assert!(!server.done()); // We're not in sync (extended protocol) - assert_eq!(server.stats().state, State::Idle); + assert_eq!(server.stats().get_state(), State::Idle); assert!(server.prepared_statements.state().queue().is_empty()); // Queue is empty assert!(!server.prepared_statements.state().in_sync()); @@ -2009,26 +2009,26 @@ pub mod test { async fn test_query_stats() { let mut server = test_server().await; - assert_eq!(server.stats().last_checkout.queries, 0); - assert_eq!(server.stats().last_checkout.transactions, 0); + assert_eq!(server.stats().last_checkout().queries, 0); + assert_eq!(server.stats().last_checkout().transactions, 0); for i in 1..26 { server.execute("SELECT 1").await.unwrap(); - assert_eq!(server.stats().last_checkout.queries, i); - assert_eq!(server.stats().last_checkout.transactions, i); - assert_eq!(server.stats().total.queries, i); - assert_eq!(server.stats().total.transactions, i); + assert_eq!(server.stats().last_checkout().queries, i); + assert_eq!(server.stats().last_checkout().transactions, i); + assert_eq!(server.stats().total().queries, i); + assert_eq!(server.stats().total().transactions, i); } let counts = server.stats_mut().reset_last_checkout(); assert_eq!(counts.queries, 25); assert_eq!(counts.transactions, 25); - assert_eq!(server.stats().last_checkout.queries, 0); - assert_eq!(server.stats().last_checkout.transactions, 0); - assert_eq!(server.stats().total.queries, 25); - assert_eq!(server.stats().total.transactions, 25); + assert_eq!(server.stats().last_checkout().queries, 0); + assert_eq!(server.stats().last_checkout().transactions, 0); + assert_eq!(server.stats().total().queries, 25); + assert_eq!(server.stats().total().transactions, 25); for i in 1..26 { server.execute("BEGIN").await.unwrap(); @@ -2036,17 +2036,17 @@ pub mod test { server.execute("SELECT 2").await.unwrap(); server.execute("COMMIT").await.unwrap(); - assert_eq!(server.stats().last_checkout.queries, i * 4); - assert_eq!(server.stats().last_checkout.transactions, i); - assert_eq!(server.stats().total.queries, 25 + (i * 4)); - assert_eq!(server.stats().total.transactions, 25 + i); + assert_eq!(server.stats().last_checkout().queries, i * 4); + assert_eq!(server.stats().last_checkout().transactions, i); + assert_eq!(server.stats().total().queries, 25 + (i * 4)); + assert_eq!(server.stats().total().transactions, 25 + i); } let counts = server.stats_mut().reset_last_checkout(); assert_eq!(counts.queries, 25 * 4); assert_eq!(counts.transactions, 25); - assert_eq!(server.stats().total.queries, 25 + (25 * 4)); - assert_eq!(server.stats().total.transactions, 25 + 25); + assert_eq!(server.stats().total().queries, 25 + (25 * 4)); + assert_eq!(server.stats().total().transactions, 25 + 25); } #[tokio::test] @@ -2328,17 +2328,17 @@ pub mod test { server.execute("SELECT 1").await.unwrap(); assert_eq!( - server.stats().total.idle_in_transaction_time, + server.stats().total().idle_in_transaction_time, Duration::ZERO, ); assert_eq!( - server.stats().last_checkout.idle_in_transaction_time, + server.stats().last_checkout().idle_in_transaction_time, Duration::ZERO, ); server.execute("BEGIN").await.unwrap(); assert_eq!( - server.stats().total.idle_in_transaction_time, + server.stats().total().idle_in_transaction_time, Duration::ZERO, ); @@ -2347,7 +2347,7 @@ pub mod test { server.execute("SELECT 2").await.unwrap(); - let idle_time = server.stats().total.idle_in_transaction_time; + let idle_time = server.stats().total().idle_in_transaction_time; assert!( idle_time >= Duration::from_millis(50), "Expected idle time >= 50ms, got {:?}", @@ -2363,7 +2363,7 @@ pub mod test { server.execute("COMMIT").await.unwrap(); - let final_idle_time = server.stats().total.idle_in_transaction_time; + let final_idle_time = server.stats().total().idle_in_transaction_time; assert!( final_idle_time >= Duration::from_millis(150), "Expected final idle time >= 150ms, got {:?}", @@ -2377,7 +2377,7 @@ pub mod test { server.execute("SELECT 3").await.unwrap(); assert_eq!( - server.stats().total.idle_in_transaction_time, + server.stats().total().idle_in_transaction_time, final_idle_time, ); } @@ -2445,7 +2445,7 @@ pub mod test { "protocol should be out of sync" ); assert!( - server.stats().state == State::Error, + server.stats().get_state() == State::Error, "state should be Error after detecting desync" ) } diff --git a/pgdog/src/backend/stats.rs b/pgdog/src/backend/stats.rs index 15dfc8d22..1841a67fa 100644 --- a/pgdog/src/backend/stats.rs +++ b/pgdog/src/backend/stats.rs @@ -1,10 +1,11 @@ //! Keep track of server stats. use std::ops::{Deref, DerefMut}; +use std::sync::Arc; use fnv::FnvHashMap as HashMap; use once_cell::sync::Lazy; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; pub use pgdog_stats::server::Counts; use tokio::time::Instant; @@ -17,51 +18,34 @@ use crate::{ use super::pool::Address; -static STATS: Lazy>> = - Lazy::new(|| Mutex::new(HashMap::default())); +static STATS: Lazy>>>> = + Lazy::new(|| RwLock::new(HashMap::default())); /// Get a copy of latest stats. pub fn stats() -> HashMap { - STATS.lock().clone() + STATS + .read() + .iter() + .map(|(k, v)| (*k, v.lock().clone())) + .collect() } /// Get idle-in-transaction server connections for connection pool. pub fn idle_in_transaction(pool: &Pool) -> usize { STATS - .lock() + .read() .values() .filter(|stat| { - stat.stats.pool_id == pool.id() && stat.stats.state == State::IdleInTransaction + let guard = stat.lock(); + guard.stats.pool_id == pool.id() && guard.stats.state == State::IdleInTransaction }) .count() } -/// Update stats to latest version. -fn update(id: BackendKeyData, stats: Stats) { - let mut guard = STATS.lock(); - if let Some(entry) = guard.get_mut(&id) { - entry.stats = stats; - } -} - -/// Server is disconnecting. -fn disconnect(id: &BackendKeyData) { - STATS.lock().remove(id); -} - -/// Connected server. -#[derive(Clone, Debug)] -pub struct ConnectedServer { - pub stats: Stats, - pub addr: Address, - pub application_name: String, - pub client: Option, -} - -/// Server statistics. -#[derive(Copy, Clone, Debug)] -pub struct Stats { - inner: pgdog_stats::server::Stats, +/// Core server statistics (shared between local and global). +#[derive(Clone, Debug, Copy)] +pub struct ServerStats { + pub inner: pgdog_stats::server::Stats, pub id: BackendKeyData, pub last_used: Instant, pub last_healthcheck: Option, @@ -72,7 +56,28 @@ pub struct Stats { idle_in_transaction_timer: Option, } -impl Deref for Stats { +impl ServerStats { + fn new(id: BackendKeyData, options: &ServerOptions, config: &Memory) -> Self { + let now = Instant::now(); + let mut inner = pgdog_stats::server::Stats::default(); + inner.memory = *MemoryStats::new(config); + inner.pool_id = options.pool_id; + + Self { + inner, + id, + last_used: now, + last_healthcheck: None, + created_at: now, + client_id: None, + query_timer: None, + transaction_timer: None, + idle_in_transaction_timer: None, + } + } +} + +impl Deref for ServerStats { type Target = pgdog_stats::server::Stats; fn deref(&self) -> &Self::Target { @@ -80,12 +85,32 @@ impl Deref for Stats { } } -impl DerefMut for Stats { +impl DerefMut for ServerStats { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner } } +/// Connected server (shared globally). +#[derive(Clone, Debug)] +pub struct ConnectedServer { + pub stats: ServerStats, + pub addr: Address, + pub application_name: String, + pub client: Option, +} + +/// Server statistics handle. +/// +/// Holds local stats for fast reads during pool operations, +/// and a reference to shared stats for global visibility. +/// Syncs local to shared on I/O operations. +#[derive(Clone, Debug)] +pub struct Stats { + local: ServerStats, + shared: Arc>, +} + impl Stats { /// Register new server with statistics. pub fn connect( @@ -95,80 +120,70 @@ impl Stats { options: &ServerOptions, config: &Memory, ) -> Self { - let now = Instant::now(); - let mut stats = Stats { - inner: pgdog_stats::server::Stats::default(), - id, - last_used: now, - last_healthcheck: None, - created_at: now, - query_timer: None, - transaction_timer: None, - client_id: None, - idle_in_transaction_timer: None, + let local = ServerStats::new(id, options, config); + + let server = ConnectedServer { + stats: local.clone(), + addr: addr.clone(), + application_name: params.get_default("application_name", "PgDog").to_owned(), + client: None, }; - stats.inner.memory = *MemoryStats::new(config); - stats.inner.pool_id = options.pool_id; + let shared = Arc::new(Mutex::new(server)); + STATS.write().insert(id, Arc::clone(&shared)); - STATS.lock().insert( - id, - ConnectedServer { - stats, - addr: addr.clone(), - application_name: params.get_default("application_name", "PgDog").to_owned(), - client: None, - }, - ); + Stats { local, shared } + } - stats + /// Sync local stats to shared (called on I/O operations). + #[inline] + fn sync_to_shared(&self) { + self.shared.lock().stats = self.local.clone(); } fn transaction_state(&mut self, now: Instant, state: State) { - self.total.transactions += 1; - self.last_checkout.transactions += 1; - self.state = state; - self.last_used = now; - if let Some(transaction_timer) = self.transaction_timer.take() { + self.local.total.transactions += 1; + self.local.last_checkout.transactions += 1; + self.local.state = state; + self.local.last_used = now; + if let Some(transaction_timer) = self.local.transaction_timer.take() { let duration = now.duration_since(transaction_timer); - self.total.transaction_time += duration; - self.last_checkout.transaction_time += duration; + self.local.total.transaction_time += duration; + self.local.last_checkout.transaction_time += duration; } - self.update(); + self.sync_to_shared(); } - pub fn link_client(&mut self, client_name: &str, server_server: &str, id: &BackendKeyData) { - self.client_id = Some(*id); - if client_name != server_server { - let mut guard = STATS.lock(); - if let Some(entry) = guard.get_mut(&self.id) { - entry.application_name.clear(); - entry.application_name.push_str(client_name); - } + pub fn link_client(&mut self, client_name: &str, server_name: &str, id: &BackendKeyData) { + self.local.client_id = Some(*id); + if client_name != server_name { + let mut guard = self.shared.lock(); + guard.stats.client_id = self.local.client_id; + guard.application_name.clear(); + guard.application_name.push_str(client_name); } } pub fn parse_complete(&mut self) { - self.total.parse += 1; - self.last_checkout.parse += 1; - self.total.prepared_statements += 1; - self.last_checkout.prepared_statements += 1; + self.local.total.parse += 1; + self.local.last_checkout.parse += 1; + self.local.total.prepared_statements += 1; + self.local.last_checkout.prepared_statements += 1; } - /// Overwrite how many prepared statements we have in the cache - /// for stats. + /// Overwrite how many prepared statements we have in the cache for stats. pub fn set_prepared_statements(&mut self, size: usize) { - self.total.prepared_statements = size; - self.total.prepared_sync += 1; - self.last_checkout.prepared_sync += 1; - self.update(); + self.local.total.prepared_statements = size; + self.local.total.prepared_sync += 1; + self.local.last_checkout.prepared_sync += 1; + self.sync_to_shared(); } pub fn close_many(&mut self, closed: usize, size: usize) { - self.total.prepared_statements = size; - self.total.close += closed; - self.last_checkout.close += closed; - self.update(); + self.local.total.prepared_statements = size; + self.local.total.close += closed; + self.local.last_checkout.close += closed; + self.sync_to_shared(); } pub fn copy_mode(&mut self) { @@ -176,8 +191,8 @@ impl Stats { } pub fn bind_complete(&mut self) { - self.total.bind += 1; - self.last_checkout.bind += 1; + self.local.total.bind += 1; + self.local.last_checkout.bind += 1; } /// A transaction has been completed. @@ -187,8 +202,8 @@ impl Stats { /// Increment two-phase commit transaction count. pub fn transaction_2pc(&mut self) { - self.last_checkout.transactions_2pc += 1; - self.total.transactions_2pc += 1; + self.local.last_checkout.transactions_2pc += 1; + self.local.total.transactions_2pc += 1; } /// Error occurred in a transaction. @@ -198,111 +213,168 @@ impl Stats { /// An error occurred in general. pub fn error(&mut self) { - self.total.errors += 1; - self.last_checkout.errors += 1; + self.local.total.errors += 1; + self.local.last_checkout.errors += 1; } /// A query has been completed. pub fn query(&mut self, now: Instant, idle_in_transaction: bool) { - self.total.queries += 1; - self.last_checkout.queries += 1; + self.local.total.queries += 1; + self.local.last_checkout.queries += 1; if idle_in_transaction { - self.idle_in_transaction_timer = Some(now); + self.local.idle_in_transaction_timer = Some(now); } - if let Some(query_timer) = self.query_timer.take() { + if let Some(query_timer) = self.local.query_timer.take() { let duration = now.duration_since(query_timer); - self.total.query_time += duration; - self.last_checkout.query_time += duration; + self.local.total.query_time += duration; + self.local.last_checkout.query_time += duration; } } pub(crate) fn set_timers(&mut self, now: Instant) { - self.transaction_timer = Some(now); - self.query_timer = Some(now); + self.local.transaction_timer = Some(now); + self.local.query_timer = Some(now); } /// Manual state change. pub fn state(&mut self, state: State) { - let update = self.state != state; - self.state = state; - if update { - self.activate(); - self.update(); - } - } - - fn activate(&mut self) { - // Client started a query/transaction. - if self.state == State::Active { - let now = Instant::now(); - if self.transaction_timer.is_none() { - self.transaction_timer = Some(now); - } - if self.query_timer.is_none() { - self.query_timer = Some(now); - } - if let Some(idle_in_transaction_timer) = self.idle_in_transaction_timer.take() { - let elapsed = now.duration_since(idle_in_transaction_timer); - self.last_checkout.idle_in_transaction_time += elapsed; - self.total.idle_in_transaction_time += elapsed; + if self.local.state != state { + self.local.state = state; + if state == State::Active { + let now = Instant::now(); + if self.local.transaction_timer.is_none() { + self.local.transaction_timer = Some(now); + } + if self.local.query_timer.is_none() { + self.local.query_timer = Some(now); + } + if let Some(idle_in_transaction_timer) = self.local.idle_in_transaction_timer.take() + { + let elapsed = now.duration_since(idle_in_transaction_timer); + self.local.last_checkout.idle_in_transaction_time += elapsed; + self.local.total.idle_in_transaction_time += elapsed; + } } + self.sync_to_shared(); } } - /// Send bytes to server. - pub fn send(&mut self, bytes: usize) { - self.total.bytes_sent += bytes; - self.last_checkout.bytes_sent += bytes; + /// Send bytes to server - syncs to shared for real-time visibility. + pub fn send(&mut self, bytes: usize, code: u8) { + self.local.total.bytes_sent += bytes; + self.local.last_checkout.bytes_sent += bytes; + self.local.last_sent = code; + self.sync_to_shared(); } - /// Receive bytes from server. - pub fn receive(&mut self, bytes: usize) { - self.total.bytes_received += bytes; - self.last_checkout.bytes_received += bytes; + /// Receive bytes from server - syncs to shared for real-time visibility. + pub fn receive(&mut self, bytes: usize, code: u8) { + self.local.total.bytes_received += bytes; + self.local.last_checkout.bytes_received += bytes; + self.local.last_received = code; + self.sync_to_shared(); } - /// Track healtchecks. + /// Track healthchecks. pub fn healthcheck(&mut self) { - self.total.healthchecks += 1; - self.last_checkout.healthchecks += 1; - self.last_healthcheck = Some(Instant::now()); - self.update(); + self.local.total.healthchecks += 1; + self.local.last_checkout.healthchecks += 1; + self.local.last_healthcheck = Some(Instant::now()); + self.sync_to_shared(); } #[inline] pub fn memory_used(&mut self, stats: MemoryStats) { - self.memory = *stats; + self.local.memory = *stats; } #[inline] pub fn cleaned(&mut self) { - self.last_checkout.cleaned += 1; - self.total.cleaned += 1; + self.local.last_checkout.cleaned += 1; + self.local.total.cleaned += 1; } /// Track rollbacks. pub fn rollback(&mut self) { - self.total.rollbacks += 1; - self.last_checkout.rollbacks += 1; - self.update(); - } - - /// Update server stats globally. - pub fn update(&self) { - update(self.id, *self) + self.local.total.rollbacks += 1; + self.local.last_checkout.rollbacks += 1; + self.sync_to_shared(); } /// Server is closing. pub(super) fn disconnect(&self) { - disconnect(&self.id); + STATS.write().remove(&self.local.id); } /// Reset last_checkout counts. pub fn reset_last_checkout(&mut self) -> Counts { - let counts = self.last_checkout; - self.last_checkout = Counts::default(); + let counts = self.local.last_checkout; + self.local.last_checkout = Counts::default(); counts } + + // Fast accessor methods - read from local, no locking. + + /// Get current state (local, no lock). + #[inline] + pub fn get_state(&self) -> State { + self.local.state + } + + /// Get created_at timestamp (local, no lock). + #[inline] + pub fn created_at(&self) -> Instant { + self.local.created_at + } + + /// Get last_used timestamp (local, no lock). + #[inline] + pub fn last_used(&self) -> Instant { + self.local.last_used + } + + /// Get last_healthcheck timestamp (local, no lock). + #[inline] + pub fn last_healthcheck(&self) -> Option { + self.local.last_healthcheck + } + + /// Get pool_id (local, no lock). + #[inline] + pub fn pool_id(&self) -> u64 { + self.local.pool_id + } + + /// Set pool_id. + #[inline] + pub fn set_pool_id(&mut self, pool_id: u64) { + self.local.pool_id = pool_id; + self.shared.lock().stats.pool_id = pool_id; + } + + /// Get total counts (local, no lock). + #[inline] + pub fn total(&self) -> Counts { + self.local.total + } + + /// Get last_checkout counts (local, no lock). + #[inline] + pub fn last_checkout(&self) -> Counts { + self.local.last_checkout + } + + /// Clear client_id. + #[inline] + pub fn clear_client_id(&mut self) { + self.local.client_id = None; + } + + /// Legacy update method - syncs local to shared. + #[inline] + pub fn update(&self) { + self.sync_to_shared(); + } } diff --git a/pgdog/src/stats/pools.rs b/pgdog/src/stats/pools.rs index 39fd6240d..b4dffaf9e 100644 --- a/pgdog/src/stats/pools.rs +++ b/pgdog/src/stats/pools.rs @@ -1,4 +1,5 @@ use crate::backend::{self, databases::databases}; +use crate::util::millis; use super::{Measurement, Metric, OpenMetric}; @@ -178,32 +179,32 @@ impl Pools { total_xact_time.push(Measurement { labels: labels.clone(), - measurement: totals.xact_time.as_millis().into(), + measurement: millis(totals.xact_time).into(), }); avg_xact_time.push(Measurement { labels: labels.clone(), - measurement: averages.xact_time.as_millis().into(), + measurement: millis(averages.xact_time).into(), }); total_idle_xact_time.push(Measurement { labels: labels.clone(), - measurement: totals.idle_xact_time.as_millis().into(), + measurement: millis(totals.idle_xact_time).into(), }); avg_idle_xact_time.push(Measurement { labels: labels.clone(), - measurement: averages.idle_xact_time.as_millis().into(), + measurement: millis(averages.idle_xact_time).into(), }); total_query_time.push(Measurement { labels: labels.clone(), - measurement: totals.query_time.as_millis().into(), + measurement: millis(totals.query_time).into(), }); avg_query_time.push(Measurement { labels: labels.clone(), - measurement: averages.query_time.as_millis().into(), + measurement: millis(averages.query_time).into(), }); total_close.push(Measurement { @@ -248,12 +249,12 @@ impl Pools { total_connect_time.push(Measurement { labels: labels.clone(), - measurement: totals.connect_time.as_millis().into(), + measurement: millis(totals.connect_time).into(), }); avg_connect_time.push(Measurement { labels: labels.clone(), - measurement: averages.connect_time.as_millis().into(), + measurement: millis(averages.connect_time).into(), }); total_connect_count.push(Measurement { diff --git a/pgdog/src/util.rs b/pgdog/src/util.rs index 61407976e..8c735dbd8 100644 --- a/pgdog/src/util.rs +++ b/pgdog/src/util.rs @@ -12,6 +12,11 @@ pub fn format_time(time: DateTime) -> String { time.format("%Y-%m-%d %H:%M:%S%.3f %Z").to_string() } +/// Convert Duration to milliseconds with 3 decimal places precision. +pub fn millis(duration: Duration) -> f64 { + (duration.as_secs_f64() * 1_000_000.0).round() / 1000.0 +} + pub fn human_duration_optional(duration: Option) -> String { if let Some(duration) = duration { human_duration(duration) diff --git a/pgdog/tests/pgbouncer/pgdog.toml b/pgdog/tests/pgbouncer/pgdog.toml index 911dc0cf0..171bd7bd9 100644 --- a/pgdog/tests/pgbouncer/pgdog.toml +++ b/pgdog/tests/pgbouncer/pgdog.toml @@ -2,6 +2,7 @@ workers = 2 min_pool_size = 0 prepared_statements_limit = 500 +openmetrics_port = 9090 [[databases]] name = "pgdog"