From 190cf40c0b8f6f00917fd14d4fde733a7798a3cc Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Wed, 7 Jan 2026 11:23:07 +0000 Subject: [PATCH 01/15] feat: implement HTTP API server with health, status, and metrics endpoints; add dashboard UI - Added `spawn_api_server` to serve HTTP endpoints for health checks, system status, and metrics. - Added static `dashboard.html` for a web-based status UI. --- forester-utils/Cargo.toml | 30 +- forester-utils/src/account_zero_copy.rs | 61 +- forester-utils/src/forester_epoch.rs | 11 +- forester/Cargo.toml | 5 +- forester/src/api_server.rs | 154 +++ forester/src/cli.rs | 171 +-- forester/src/config.rs | 15 + forester/src/epoch_manager.rs | 38 +- forester/src/forester_status.rs | 590 +++++++- forester/src/lib.rs | 26 +- forester/src/main.rs | 5 + forester/src/processor/v1/send_transaction.rs | 56 +- forester/src/queue_helpers.rs | 234 +++- forester/src/tree_data_sync.rs | 42 +- forester/static/dashboard.html | 1219 +++++++++++++++++ forester/test.sh | 10 - forester/tests/e2e_test.rs | 1 + forester/tests/priority_fee_test.rs | 2 + forester/tests/test_utils.rs | 1 + program-tests/registry-test/tests/tests.rs | 2 + program-tests/utils/src/e2e_test_env.rs | 2 + program-tests/utils/src/setup_forester.rs | 2 + sdk-libs/client/src/rpc/client.rs | 31 + sdk-libs/client/src/rpc/rpc_trait.rs | 7 + sdk-libs/program-test/src/program_test/rpc.rs | 15 + 25 files changed, 2518 insertions(+), 212 deletions(-) create mode 100644 forester/src/api_server.rs create mode 100644 forester/static/dashboard.html delete mode 100644 forester/test.sh diff --git a/forester-utils/Cargo.toml b/forester-utils/Cargo.toml index 9bbb143256..72c92b7e64 100644 --- a/forester-utils/Cargo.toml +++ b/forester-utils/Cargo.toml @@ -13,20 +13,16 @@ devenv = ["v2", "light-client/devenv", "light-prover-client/devenv"] v2 = ["light-client/v2"] [dependencies] - light-hash-set = { workspace = true } light-hasher = { workspace = true, features = ["poseidon"] } light-concurrent-merkle-tree = { workspace = true } light-indexed-merkle-tree = { workspace = true } -light-indexed-array = { workspace = true } light-compressed-account = { workspace = true, features = ["std"] } light-batched-merkle-tree = { workspace = true } -light-merkle-tree-metadata = { workspace = true } light-merkle-tree-reference = { workspace = true } light-sparse-merkle-tree = { workspace = true } light-account-checks = { workspace = true } light-sdk = { workspace = true } - light-client = { workspace = true } light-prover-client = { workspace = true } light-registry = { workspace = true, features = ["cpi"] } @@ -35,29 +31,15 @@ light-token-interface = { workspace = true } solana-instruction = { workspace = true } solana-pubkey = { workspace = true } - -tokio = { workspace = true } -futures = { workspace = true } -async-stream = "0.3" - -anchor-lang = { workspace = true } - solana-sdk = { workspace = true } +anchor-lang = { workspace = true } -thiserror = { workspace = true } -anyhow = { workspace = true } - -tracing = { workspace = true } - -num-traits = { workspace = true } -num-bigint = { workspace = true } - +tokio = { workspace = true } bb8 = { workspace = true } async-trait = { workspace = true } governor = { workspace = true } +num-traits = { workspace = true } + +thiserror = { workspace = true } +tracing = { workspace = true } -[dev-dependencies] -tokio-postgres = "0.7" -bs58 = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } diff --git a/forester-utils/src/account_zero_copy.rs b/forester-utils/src/account_zero_copy.rs index b1cedc2b3c..f5113a58e4 100644 --- a/forester-utils/src/account_zero_copy.rs +++ b/forester-utils/src/account_zero_copy.rs @@ -2,10 +2,12 @@ use std::{fmt, marker::PhantomData, mem, pin::Pin}; use account_compression::processor::initialize_address_merkle_tree::Pubkey; use light_client::rpc::Rpc; -use light_concurrent_merkle_tree::copy::ConcurrentMerkleTreeCopy; +use light_concurrent_merkle_tree::{ + copy::ConcurrentMerkleTreeCopy, errors::ConcurrentMerkleTreeError, +}; use light_hash_set::HashSet; use light_hasher::Hasher; -use light_indexed_merkle_tree::copy::IndexedMerkleTreeCopy; +use light_indexed_merkle_tree::{copy::IndexedMerkleTreeCopy, errors::IndexedMerkleTreeError}; use num_traits::{CheckedAdd, CheckedSub, ToBytes, Unsigned}; use solana_sdk::account::Account; @@ -91,3 +93,58 @@ where IndexedMerkleTreeCopy::from_bytes_copy(&account.data[8 + mem::size_of::()..]).unwrap() } + +/// Parse ConcurrentMerkleTree from raw account data bytes. +pub fn parse_concurrent_merkle_tree_from_bytes( + data: &[u8], +) -> Result, ConcurrentMerkleTreeError> +where + H: Hasher, +{ + let offset = 8 + mem::size_of::(); + if data.len() <= offset { + return Err(ConcurrentMerkleTreeError::BufferSize(offset, data.len())); + } + ConcurrentMerkleTreeCopy::from_bytes_copy(&data[offset..]) +} + +/// Parse IndexedMerkleTree from raw account data byte +pub fn parse_indexed_merkle_tree_from_bytes( + data: &[u8], +) -> Result, IndexedMerkleTreeError> +where + H: Hasher, + I: CheckedAdd + + CheckedSub + + Copy + + Clone + + fmt::Debug + + PartialOrd + + ToBytes + + TryFrom + + Unsigned, + usize: From, +{ + let offset = 8 + mem::size_of::(); + if data.len() <= offset { + return Err(IndexedMerkleTreeError::ConcurrentMerkleTree( + ConcurrentMerkleTreeError::BufferSize(offset, data.len()), + )); + } + IndexedMerkleTreeCopy::from_bytes_copy(&data[offset..]) +} + +/// Parse HashSet from raw queue account data bytes +/// +/// # Safety +/// Same safety requirements as `get_hash_set`. +pub unsafe fn parse_hash_set_from_bytes( + data: &[u8], +) -> Result { + let offset = 8 + mem::size_of::(); + if data.len() <= offset { + return Err(light_hash_set::HashSetError::BufferSize(offset, data.len())); + } + let mut data_copy = data[offset..].to_vec(); + HashSet::from_bytes_copy(&mut data_copy) +} diff --git a/forester-utils/src/forester_epoch.rs b/forester-utils/src/forester_epoch.rs index 38c006037b..d92bac8fdb 100644 --- a/forester-utils/src/forester_epoch.rs +++ b/forester-utils/src/forester_epoch.rs @@ -62,6 +62,7 @@ pub struct TreeAccounts { // TODO: evaluate whether we need pub is_rolledover: bool, pub tree_type: TreeType, + pub owner: Pubkey, } impl TreeAccounts { @@ -70,12 +71,14 @@ impl TreeAccounts { queue: Pubkey, tree_type: TreeType, is_rolledover: bool, + owner: Pubkey, ) -> Self { Self { merkle_tree, queue, tree_type, is_rolledover, + owner, } } } @@ -170,9 +173,15 @@ impl TreeForesterSchedule { tree_accounts: *tree_accounts, slots: Vec::new(), }; + // V2 trees use merkle_tree pubkey for eligibility check on-chain, + // V1 trees use queue pubkey. Must match on-chain check_forester logic. + let eligibility_pubkey = match tree_accounts.tree_type { + TreeType::StateV2 | TreeType::AddressV2 => &tree_accounts.merkle_tree, + _ => &tree_accounts.queue, + }; _self.slots = get_schedule_for_forester_in_queue( solana_slot, - &_self.tree_accounts.queue, + eligibility_pubkey, epoch_pda.registered_weight, forester_epoch_pda, )?; diff --git a/forester/Cargo.toml b/forester/Cargo.toml index 5d829727e4..b0ae60bab6 100644 --- a/forester/Cargo.toml +++ b/forester/Cargo.toml @@ -8,6 +8,7 @@ publish = false anchor-lang = { workspace = true } clap = { version = "4.5.53", features = ["derive", "env"] } solana-sdk = { workspace = true } +solana-commitment-config = { workspace = true } solana-client = { workspace = true } solana-account-decoder = { workspace = true } solana-program = { workspace = true } @@ -50,12 +51,13 @@ anyhow = { workspace = true } prometheus = "0.14" lazy_static = { workspace = true } -warp = "0.4" +warp = { version = "0.4", features = ["server"] } dashmap = { workspace = true } scopeguard = "1.2" itertools = "0.14" async-channel = "2.5" solana-pubkey = { workspace = true } +dotenvy = "0.15" [dev-dependencies] serial_test = { workspace = true } @@ -63,7 +65,6 @@ light-prover-client = { workspace = true, features = ["devenv"] } light-test-utils = { workspace = true } light-program-test = { workspace = true, features = ["devenv"] } light-token-client = { workspace = true } -dotenvy = "0.15" light-compressed-token = { workspace = true } rand = { workspace = true } create-address-test-program = { workspace = true } diff --git a/forester/src/api_server.rs b/forester/src/api_server.rs new file mode 100644 index 0000000000..637f611330 --- /dev/null +++ b/forester/src/api_server.rs @@ -0,0 +1,154 @@ +use std::{collections::HashMap, net::SocketAddr}; + +use serde::{Deserialize, Serialize}; +use tracing::{error, info}; +use warp::{http::Response, Filter}; + +use crate::{forester_status::get_forester_status_blocking, metrics::REGISTRY}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HealthResponse { + pub status: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ErrorResponse { + pub error: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MetricsResponse { + pub transactions_processed_total: HashMap, + pub transaction_rate: HashMap, + pub last_run_timestamp: i64, + pub forester_balances: HashMap, + pub queue_lengths: HashMap, +} + +const DASHBOARD_HTML: &str = include_str!("../static/dashboard.html"); + +pub fn spawn_api_server(rpc_url: String, port: u16) { + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async move { + let addr = SocketAddr::from(([0, 0, 0, 0], port)); + info!("Starting HTTP API server on {}", addr); + + let dashboard_route = warp::path::end().and(warp::get()).map(|| { + Response::builder() + .header("content-type", "text/html; charset=utf-8") + .body(DASHBOARD_HTML) + }); + + let health_route = warp::path("health").and(warp::get()).map(|| { + warp::reply::json(&HealthResponse { + status: "ok".to_string(), + }) + }); + + let status_route = warp::path("status").and(warp::get()).and_then(move || { + let rpc_url = rpc_url.clone(); + async move { + match tokio::task::spawn_blocking(move || { + get_forester_status_blocking(&rpc_url) + }) + .await + { + Ok(Ok(status)) => Ok::<_, warp::Rejection>(warp::reply::json(&status)), + Ok(Err(e)) => { + error!("Failed to get forester status: {:?}", e); + let error_response = ErrorResponse { + error: format!("Failed to get forester status: {}", e), + }; + Ok(warp::reply::json(&error_response)) + } + Err(e) => { + error!("Task join error: {:?}", e); + let error_response = ErrorResponse { + error: format!("Task join error: {}", e), + }; + Ok(warp::reply::json(&error_response)) + } + } + } + }); + + let metrics_route = warp::path!("metrics" / "json").and(warp::get()).map(|| { + let metrics = get_metrics_json(); + warp::reply::json(&metrics) + }); + + let routes = dashboard_route + .or(health_route) + .or(status_route) + .or(metrics_route); + + warp::serve(routes).run(addr).await; + }); + }); +} + +fn get_metrics_json() -> MetricsResponse { + use prometheus::Encoder; + + let encoder = prometheus::TextEncoder::new(); + let metric_families = REGISTRY.gather(); + let mut buffer = Vec::new(); + let _ = encoder.encode(&metric_families, &mut buffer); + let text = String::from_utf8_lossy(&buffer); + + let mut transactions_processed: HashMap = HashMap::new(); + let mut transaction_rate: HashMap = HashMap::new(); + let mut last_run_timestamp: i64 = 0; + let mut forester_balances: HashMap = HashMap::new(); + let mut queue_lengths: HashMap = HashMap::new(); + + for line in text.lines() { + if line.starts_with('#') || line.is_empty() { + continue; + } + + if let Some((metric_part, value_str)) = line.rsplit_once(' ') { + let value: f64 = value_str.parse().unwrap_or(0.0); + + if metric_part.starts_with("forester_transactions_processed_total") { + if let Some(epoch) = extract_label(metric_part, "epoch") { + transactions_processed.insert(epoch, value as u64); + } + } else if metric_part.starts_with("forester_transaction_rate") { + if let Some(epoch) = extract_label(metric_part, "epoch") { + transaction_rate.insert(epoch, value); + } + } else if metric_part.starts_with("forester_last_run_timestamp") { + last_run_timestamp = value as i64; + } else if metric_part.starts_with("forester_sol_balance") { + if let Some(pubkey) = extract_label(metric_part, "pubkey") { + forester_balances.insert(pubkey, value); + } + } else if metric_part.starts_with("queue_length") { + if let Some(tree_pubkey) = extract_label(metric_part, "tree_pubkey") { + queue_lengths.insert(tree_pubkey, value as i64); + } + } + } + } + + MetricsResponse { + transactions_processed_total: transactions_processed, + transaction_rate, + last_run_timestamp, + forester_balances, + queue_lengths, + } +} + +fn extract_label(metric_part: &str, label_name: &str) -> Option { + let label_pattern = format!("{}=\"", label_name); + if let Some(start) = metric_part.find(&label_pattern) { + let value_start = start + label_pattern.len(); + if let Some(end) = metric_part[value_start..].find('"') { + return Some(metric_part[value_start..value_start + end].to_string()); + } + } + None +} diff --git a/forester/src/cli.rs b/forester/src/cli.rs index f64c03322f..9bad9986c7 100644 --- a/forester/src/cli.rs +++ b/forester/src/cli.rs @@ -17,96 +17,88 @@ pub enum Commands { #[derive(Parser, Clone, Debug)] pub struct StartArgs { - #[arg(long, env = "FORESTER_RPC_URL")] + #[arg(long, env = "RPC_URL")] pub rpc_url: Option, - #[arg(long, env = "FORESTER_PUSH_GATEWAY_URL")] + #[arg(long, env = "PUSH_GATEWAY_URL")] pub push_gateway_url: Option, - #[arg(long, env = "FORESTER_PAGERDUTY_ROUTING_KEY")] + #[arg(long, env = "PAGERDUTY_ROUTING_KEY")] pub pagerduty_routing_key: Option, - #[arg(long, env = "FORESTER_WS_RPC_URL")] + #[arg(long, env = "WS_RPC_URL")] pub ws_rpc_url: Option, - #[arg(long, env = "FORESTER_INDEXER_URL")] + #[arg(long, env = "INDEXER_URL")] pub indexer_url: Option, - #[arg(long, env = "FORESTER_PROVER_URL")] + #[arg(long, env = "PROVER_URL")] pub prover_url: Option, #[arg( long, - env = "FORESTER_PROVER_APPEND_URL", + env = "PROVER_APPEND_URL", help = "Prover URL for append operations. If not specified, uses prover_url" )] pub prover_append_url: Option, #[arg( long, - env = "FORESTER_PROVER_UPDATE_URL", + env = "PROVER_UPDATE_URL", help = "Prover URL for update operations. If not specified, uses prover_url" )] pub prover_update_url: Option, #[arg( long, - env = "FORESTER_PROVER_ADDRESS_APPEND_URL", + env = "PROVER_ADDRESS_APPEND_URL", help = "Prover URL for address-append operations. If not specified, uses prover_url" )] pub prover_address_append_url: Option, - #[arg(long, env = "FORESTER_PROVER_API_KEY")] + #[arg(long, env = "PROVER_API_KEY")] pub prover_api_key: Option, #[arg( long, - env = "FORESTER_PROVER_POLLING_INTERVAL_MS", + env = "PROVER_POLLING_INTERVAL_MS", help = "Prover polling interval in milliseconds (default: 1000)" )] pub prover_polling_interval_ms: Option, #[arg( long, - env = "FORESTER_PROVER_MAX_WAIT_TIME_SECS", + env = "PROVER_MAX_WAIT_TIME_SECS", help = "Maximum time to wait for prover response in seconds (default: 600)" )] pub prover_max_wait_time_secs: Option, - #[arg(long, env = "FORESTER_PAYER")] + #[arg(long, env = "PAYER")] pub payer: Option, - #[arg(long, env = "FORESTER_DERIVATION_PUBKEY")] + #[arg(long, env = "DERIVATION_PUBKEY")] pub derivation: Option, - #[arg(long, env = "FORESTER_PHOTON_API_KEY")] + #[arg(long, env = "PHOTON_API_KEY")] pub photon_api_key: Option, - #[arg(long, env = "FORESTER_PHOTON_GRPC_URL")] + #[arg(long, env = "PHOTON_GRPC_URL")] pub photon_grpc_url: Option, - #[arg(long, env = "FORESTER_INDEXER_BATCH_SIZE", default_value = "50")] + #[arg(long, env = "INDEXER_BATCH_SIZE", default_value = "50")] pub indexer_batch_size: usize, - #[arg( - long, - env = "FORESTER_INDEXER_MAX_CONCURRENT_BATCHES", - default_value = "10" - )] + #[arg(long, env = "INDEXER_MAX_CONCURRENT_BATCHES", default_value = "10")] pub indexer_max_concurrent_batches: usize, - #[arg(long, env = "FORESTER_LEGACY_XS_PER_TX", default_value = "1")] + #[arg(long, env = "LEGACY_XS_PER_TX", default_value = "1")] pub legacy_ixs_per_tx: usize, - #[arg( - long, - env = "FORESTER_TRANSACTION_MAX_CONCURRENT_BATCHES", - default_value = "20" - )] + #[arg(long, env = "TRANSACTION_MAX_CONCURRENT_BATCHES", default_value = "20")] pub transaction_max_concurrent_batches: usize, #[arg( long, - env = "FORESTER_MAX_CONCURRENT_SENDS", + env = "MAX_CONCURRENT_SENDS", default_value = "50", help = "Maximum number of concurrent transaction sends per batch" )] @@ -114,7 +106,7 @@ pub struct StartArgs { #[arg( long, - env = "FORESTER_TX_CACHE_TTL_SECONDS", + env = "TX_CACHE_TTL_SECONDS", default_value = "180", help = "TTL in seconds to prevent duplicate transaction processing" )] @@ -122,7 +114,7 @@ pub struct StartArgs { #[arg( long, - env = "FORESTER_OPS_CACHE_TTL_SECONDS", + env = "OPS_CACHE_TTL_SECONDS", default_value = "180", help = "TTL in seconds to prevent duplicate batch operations processing" )] @@ -130,7 +122,7 @@ pub struct StartArgs { #[arg( long, - env = "FORESTER_CONFIRMATION_MAX_ATTEMPTS", + env = "CONFIRMATION_MAX_ATTEMPTS", default_value = "60", help = "Maximum attempts to confirm a transaction before timing out" )] @@ -138,107 +130,75 @@ pub struct StartArgs { #[arg( long, - env = "FORESTER_CONFIRMATION_POLL_INTERVAL_MS", + env = "CONFIRMATION_POLL_INTERVAL_MS", default_value = "500", help = "Interval between confirmation polling attempts in milliseconds" )] pub confirmation_poll_interval_ms: u64, - #[arg(long, env = "FORESTER_CU_LIMIT", default_value = "1000000")] + #[arg(long, env = "CU_LIMIT", default_value = "1000000")] pub cu_limit: u32, - #[arg(long, env = "FORESTER_ENABLE_PRIORITY_FEES", default_value = "false")] + #[arg(long, env = "ENABLE_PRIORITY_FEES", default_value = "false")] pub enable_priority_fees: bool, - #[arg(long, env = "FORESTER_RPC_POOL_SIZE", default_value = "100")] + #[arg(long, env = "RPC_POOL_SIZE", default_value = "100")] pub rpc_pool_size: u32, - #[arg( - long, - env = "FORESTER_RPC_POOL_CONNECTION_TIMEOUT_SECS", - default_value = "15" - )] + #[arg(long, env = "RPC_POOL_CONNECTION_TIMEOUT_SECS", default_value = "15")] pub rpc_pool_connection_timeout_secs: u64, - #[arg( - long, - env = "FORESTER_RPC_POOL_IDLE_TIMEOUT_SECS", - default_value = "300" - )] + #[arg(long, env = "RPC_POOL_IDLE_TIMEOUT_SECS", default_value = "300")] pub rpc_pool_idle_timeout_secs: u64, - #[arg(long, env = "FORESTER_RPC_POOL_MAX_RETRIES", default_value = "100")] + #[arg(long, env = "RPC_POOL_MAX_RETRIES", default_value = "100")] pub rpc_pool_max_retries: u32, - #[arg( - long, - env = "FORESTER_RPC_POOL_INITIAL_RETRY_DELAY_MS", - default_value = "1000" - )] + #[arg(long, env = "RPC_POOL_INITIAL_RETRY_DELAY_MS", default_value = "1000")] pub rpc_pool_initial_retry_delay_ms: u64, - #[arg( - long, - env = "FORESTER_RPC_POOL_MAX_RETRY_DELAY_MS", - default_value = "16000" - )] + #[arg(long, env = "RPC_POOL_MAX_RETRY_DELAY_MS", default_value = "16000")] pub rpc_pool_max_retry_delay_ms: u64, - #[arg( - long, - env = "FORESTER_SLOT_UPDATE_INTERVAL_SECONDS", - default_value = "10" - )] + #[arg(long, env = "SLOT_UPDATE_INTERVAL_SECONDS", default_value = "10")] pub slot_update_interval_seconds: u64, - #[arg( - long, - env = "FORESTER_TREE_DISCOVERY_INTERVAL_SECONDS", - default_value = "5" - )] + #[arg(long, env = "TREE_DISCOVERY_INTERVAL_SECONDS", default_value = "5")] pub tree_discovery_interval_seconds: u64, - #[arg(long, env = "FORESTER_MAX_RETRIES", default_value = "3")] + #[arg(long, env = "MAX_RETRIES", default_value = "3")] pub max_retries: u32, - #[arg(long, env = "FORESTER_RETRY_DELAY", default_value = "1000")] + #[arg(long, env = "RETRY_DELAY", default_value = "1000")] pub retry_delay: u64, - #[arg(long, env = "FORESTER_RETRY_TIMEOUT", default_value = "30000")] + #[arg(long, env = "RETRY_TIMEOUT", default_value = "30000")] pub retry_timeout: u64, - #[arg(long, env = "FORESTER_STATE_QUEUE_START_INDEX", default_value = "0")] + #[arg(long, env = "STATE_QUEUE_START_INDEX", default_value = "0")] pub state_queue_start_index: u16, - #[arg( - long, - env = "FORESTER_STATE_PROCESSING_LENGTH", - default_value = "28807" - )] + #[arg(long, env = "STATE_PROCESSING_LENGTH", default_value = "28807")] pub state_queue_processing_length: u16, - #[arg(long, env = "FORESTER_ADDRESS_QUEUE_START_INDEX", default_value = "0")] + #[arg(long, env = "ADDRESS_QUEUE_START_INDEX", default_value = "0")] pub address_queue_start_index: u16, - #[arg( - long, - env = "FORESTER_ADDRESS_PROCESSING_LENGTH", - default_value = "28807" - )] + #[arg(long, env = "ADDRESS_PROCESSING_LENGTH", default_value = "28807")] pub address_queue_processing_length: u16, - #[arg(long, env = "FORESTER_RPC_RATE_LIMIT")] + #[arg(long, env = "RPC_RATE_LIMIT")] pub rpc_rate_limit: Option, - #[arg(long, env = "FORESTER_PHOTON_RATE_LIMIT")] + #[arg(long, env = "PHOTON_RATE_LIMIT")] pub photon_rate_limit: Option, - #[arg(long, env = "FORESTER_SEND_TRANSACTION_RATE_LIMIT")] + #[arg(long, env = "SEND_TRANSACTION_RATE_LIMIT")] pub send_tx_rate_limit: Option, #[arg( long, - env = "FORESTER_PROCESSOR_MODE", + env = "PROCESSOR_MODE", default_value_t = ProcessorMode::All, help = "Processor mode: v1 (process only v1 trees), v2 (process only v2 trees), all (process all trees)" )] @@ -246,7 +206,7 @@ pub struct StartArgs { #[arg( long, - env = "FORESTER_QUEUE_POLLING_MODE", + env = "QUEUE_POLLING_MODE", default_value_t = QueuePollingMode::Indexer, help = "Queue polling mode: indexer (poll indexer API, requires indexer_url), onchain (read queue status directly from RPC)" )] @@ -254,7 +214,7 @@ pub struct StartArgs { #[arg( long = "tree-id", - env = "FORESTER_TREE_IDS", + env = "TREE_IDS", help = "Process only the specified trees (Pubkeys). Can be specified multiple times. If specified, forester will process only these trees and ignore all others", value_delimiter = ',' )] @@ -262,7 +222,7 @@ pub struct StartArgs { #[arg( long, - env = "FORESTER_ENABLE_COMPRESSIBLE", + env = "ENABLE_COMPRESSIBLE", help = "Enable compressible account tracking and compression using ws_rpc_url (requires --ws-rpc-url)", default_value = "false" )] @@ -270,20 +230,39 @@ pub struct StartArgs { #[arg( long, - env = "FORESTER_LOOKUP_TABLE_ADDRESS", + env = "LOOKUP_TABLE_ADDRESS", help = "Address lookup table pubkey for versioned transactions. If not provided, legacy transactions will be used." )] pub lookup_table_address: Option, + + #[arg( + long, + env = "API_SERVER_PORT", + help = "HTTP API server port (default: 8080)", + default_value = "8080" + )] + pub api_server_port: u16, + + #[arg( + long, + env = "GROUP_AUTHORITY", + help = "Filter trees by group authority pubkey. Only process trees owned by this authority." + )] + pub group_authority: Option, } #[derive(Parser, Clone, Debug)] pub struct StatusArgs { - #[arg(long, env = "FORESTER_RPC_URL")] + #[arg(long, env = "RPC_URL", value_name = "RPC_URL", alias = "RPC_URL")] pub rpc_url: String, - #[arg(long, env = "FORESTER_PUSH_GATEWAY_URL")] + #[arg(long, env = "PUSH_GATEWAY_URL", value_name = "PUSH_GATEWAY_URL")] pub push_gateway_url: Option, - #[arg(long, env = "FORESTER_PAGERDUTY_ROUTING_KEY")] + #[arg( + long, + env = "PAGERDUTY_ROUTING_KEY", + value_name = "PAGERDUTY_ROUTING_KEY" + )] pub pagerduty_routing_key: Option, /// Select to run compressed token program tests. #[clap(long)] @@ -314,13 +293,13 @@ pub struct HealthArgs { #[arg(long, help = "Check forester registration for current epoch")] pub check_registration: bool, - #[arg(long, env = "FORESTER_RPC_URL")] + #[arg(long, env = "RPC_URL")] pub rpc_url: Option, - #[arg(long, env = "FORESTER_PAYER")] + #[arg(long, env = "PAYER")] pub payer: Option, - #[arg(long, env = "FORESTER_DERIVATION_PUBKEY")] + #[arg(long, env = "DERIVATION_PUBKEY")] pub derivation: Option, #[arg( diff --git a/forester/src/config.rs b/forester/src/config.rs index dd7c96121c..2286c6e7b6 100644 --- a/forester/src/config.rs +++ b/forester/src/config.rs @@ -95,6 +95,7 @@ pub struct GeneralConfig { pub sleep_after_processing_ms: u64, pub sleep_when_idle_ms: u64, pub queue_polling_mode: QueuePollingMode, + pub group_authority: Option, } impl Default for GeneralConfig { @@ -111,6 +112,7 @@ impl Default for GeneralConfig { sleep_after_processing_ms: 10_000, sleep_when_idle_ms: 45_000, queue_polling_mode: QueuePollingMode::Indexer, + group_authority: None, } } } @@ -129,6 +131,7 @@ impl GeneralConfig { sleep_after_processing_ms: 50, sleep_when_idle_ms: 100, queue_polling_mode: QueuePollingMode::Indexer, + group_authority: None, } } @@ -145,6 +148,7 @@ impl GeneralConfig { sleep_after_processing_ms: 50, sleep_when_idle_ms: 100, queue_polling_mode: QueuePollingMode::Indexer, + group_authority: None, } } } @@ -325,6 +329,16 @@ impl ForesterConfig { sleep_after_processing_ms: 10_000, sleep_when_idle_ms: 45_000, queue_polling_mode: args.queue_polling_mode, + group_authority: args + .group_authority + .as_ref() + .map(|s| { + Pubkey::from_str(s).map_err(|e| ConfigError::InvalidArguments { + field: "group_authority", + invalid_values: vec![e.to_string()], + }) + }) + .transpose()?, }, rpc_pool_config: RpcPoolConfig { max_size: args.rpc_pool_size, @@ -415,6 +429,7 @@ impl ForesterConfig { sleep_after_processing_ms: 10_000, sleep_when_idle_ms: 45_000, queue_polling_mode: QueuePollingMode::OnChain, // Status uses on-chain reads + group_authority: None, }, rpc_pool_config: RpcPoolConfig { max_size: 10, diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 9c1ad6a059..12abd9b5d3 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -72,7 +72,7 @@ use crate::{ perform_state_merkle_tree_rollover_forester, }, slot_tracker::{slot_duration, wait_until_slot_reached, SlotTracker}, - tree_data_sync::fetch_trees, + tree_data_sync::{fetch_protocol_group_authority, fetch_trees}, ForesterConfig, ForesterEpochInfo, Result, }; @@ -1144,6 +1144,7 @@ impl EpochManager { queue: solana_sdk::pubkey::Pubkey::default(), tree_type: TreeType::Unknown, is_rolledover: false, + owner: solana_sdk::pubkey::Pubkey::default(), }; let tree_schedule = TreeForesterSchedule::new_with_schedule( &compression_tree_accounts, @@ -1509,7 +1510,7 @@ impl EpochManager { .check_forester_eligibility( epoch_pda, current_light_slot, - &tree_accounts.queue, + &tree_accounts.merkle_tree, epoch_info.epoch, epoch_info, ) @@ -2816,6 +2817,36 @@ pub async fn run_service( fetch_result = fetch_trees(&*rpc) => { match fetch_result { Ok(mut fetched_trees) => { + let group_authority = match config.general_config.group_authority { + Some(ga) => Some(ga), + None => { + match fetch_protocol_group_authority(&*rpc).await { + Ok(ga) => { + info!("Using protocol default group authority: {}", ga); + Some(ga) + } + Err(e) => { + warn!( + "Failed to fetch protocol group authority, processing all trees: {:?}", + e + ); + None + } + } + } + }; + + if let Some(group_authority) = group_authority { + let before_count = fetched_trees.len(); + fetched_trees.retain(|tree| tree.owner == group_authority); + info!( + "Filtered trees by group authority {}: {} -> {} trees", + group_authority, + before_count, + fetched_trees.len() + ); + } + if !config.general_config.tree_ids.is_empty() { let tree_ids = &config.general_config.tree_ids; fetched_trees.retain(|tree| tree_ids.contains(&tree.merkle_tree)); @@ -3058,6 +3089,7 @@ mod tests { sleep_after_processing_ms: 50, sleep_when_idle_ms: 100, queue_polling_mode: crate::cli::QueuePollingMode::Indexer, + group_authority: None, }, rpc_pool_config: Default::default(), registry_pubkey: Pubkey::default(), @@ -3158,6 +3190,7 @@ mod tests { queue: Pubkey::new_unique(), is_rolledover: false, tree_type: TreeType::AddressV1, + owner: Default::default(), }; let work_item = WorkItem { @@ -3179,6 +3212,7 @@ mod tests { queue: Pubkey::new_unique(), is_rolledover: false, tree_type: TreeType::StateV1, + owner: Default::default(), }; let work_item = WorkItem { diff --git a/forester/src/forester_status.rs b/forester/src/forester_status.rs index 9c6b4c74eb..756274b6e9 100644 --- a/forester/src/forester_status.rs +++ b/forester/src/forester_status.rs @@ -1,25 +1,607 @@ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; +use account_compression::{AddressMerkleTreeAccount, QueueAccount, StateMerkleTreeAccount}; use anchor_lang::{AccountDeserialize, Discriminator}; use anyhow::Context; -use forester_utils::forester_epoch::{get_epoch_phases, TreeAccounts}; +use borsh::BorshDeserialize; +use forester_utils::{ + account_zero_copy::{ + parse_concurrent_merkle_tree_from_bytes, parse_hash_set_from_bytes, + parse_indexed_merkle_tree_from_bytes, + }, + forester_epoch::{get_epoch_phases, TreeAccounts}, +}; use itertools::Itertools; +use light_batched_merkle_tree::merkle_tree::BatchedMerkleTreeAccount; use light_client::rpc::{LightClient, LightClientConfig, Rpc}; use light_compressed_account::TreeType; +use light_hasher::Poseidon; use light_registry::{protocol_config::state::ProtocolConfigPda, EpochPda, ForesterEpochPda}; +use serde::{Deserialize, Serialize}; use solana_program::{clock::Slot, pubkey::Pubkey}; -use solana_sdk::{account::ReadableAccount, commitment_config::CommitmentConfig}; +use solana_sdk::{ + account::{Account, ReadableAccount}, + commitment_config::CommitmentConfig, +}; use tracing::{debug, warn}; use crate::{ cli::StatusArgs, metrics::{push_metrics, register_metrics, update_registered_foresters}, + queue_helpers::{parse_address_v2_queue_info, parse_state_v2_queue_info, V2QueueInfo}, rollover::get_tree_fullness, run_queue_info, - tree_data_sync::fetch_trees, + tree_data_sync::{fetch_protocol_group_authority, fetch_trees}, ForesterConfig, }; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ForesterInfo { + pub authority: String, + pub balance_sol: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ForesterStatus { + pub slot: u64, + pub current_active_epoch: u64, + pub current_registration_epoch: u64, + pub active_epoch_progress: u64, + pub active_phase_length: u64, + pub active_epoch_progress_percentage: f64, + pub hours_until_next_epoch: u64, + pub slots_until_next_registration: u64, + pub hours_until_next_registration: u64, + pub active_epoch_foresters: Vec, + pub registration_epoch_foresters: Vec, + pub trees: Vec, + /// Current light slot index (None if not in active phase) + pub current_light_slot: Option, + /// Solana slots per light slot (forester rotation interval) + pub light_slot_length: u64, + /// Slots remaining until next light slot (forester rotation) + pub slots_until_next_light_slot: Option, + /// Total number of light slots in the active phase + pub total_light_slots: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TreeStatus { + pub tree_type: String, + pub merkle_tree: String, + pub queue: String, + pub fullness_percentage: f64, + pub next_index: u64, + pub threshold: u64, + pub is_rolledover: bool, + pub queue_length: Option, + pub v2_queue_info: Option, + /// Currently assigned forester for this tree (in current light slot) + pub assigned_forester: Option, + /// Schedule: forester index (into active_epoch_foresters) for each light slot + /// None means no forester assigned for that slot + pub schedule: Vec>, + /// Owner (group authority) of the tree + pub owner: String, +} + +pub fn get_forester_status_blocking(rpc_url: &str) -> crate::Result { + tokio::runtime::Runtime::new() + .context("Failed to create tokio runtime")? + .block_on(get_forester_status_async(rpc_url)) +} + +async fn get_forester_status_async(rpc_url: &str) -> crate::Result { + let rpc = LightClient::new(LightClientConfig { + url: rpc_url.to_string(), + photon_url: None, + api_key: None, + commitment_config: None, + fetch_active_tree: false, + }) + .await + .context("Failed to create LightClient")?; + + // Phase 1: Fetch registry accounts and slot in parallel + let (registry_result, slot_result) = + tokio::join!(fetch_registry_accounts_filtered(&rpc), rpc.get_slot(),); + + let (forester_epoch_pdas, _epoch_pdas, protocol_config_pdas) = registry_result?; + let slot = slot_result.context("Failed to get slot")?; + + let protocol_config_pda = protocol_config_pdas + .first() + .cloned() + .context("No ProtocolConfigPda found in registry program accounts")?; + + let current_active_epoch = protocol_config_pda.config.get_current_active_epoch(slot)?; + let current_registration_epoch = protocol_config_pda.config.get_latest_register_epoch(slot)?; + + let active_epoch_progress = protocol_config_pda + .config + .get_current_active_epoch_progress(slot); + let active_phase_length = protocol_config_pda.config.active_phase_length; + let active_epoch_progress_percentage = + active_epoch_progress as f64 / active_phase_length as f64 * 100f64; + + let hours_until_next_epoch = + active_phase_length.saturating_sub(active_epoch_progress) * 460 / 1000 / 3600; + + let slots_until_next_registration = protocol_config_pda + .config + .registration_phase_length + .saturating_sub(active_epoch_progress); + let hours_until_next_registration = slots_until_next_registration * 460 / 1000 / 3600; + + // Collect forester authorities for both epochs + let active_forester_authorities: Vec = forester_epoch_pdas + .iter() + .filter(|pda| pda.epoch == current_active_epoch) + .map(|pda| pda.authority) + .collect(); + + let registration_forester_authorities: Vec = forester_epoch_pdas + .iter() + .filter(|pda| pda.epoch == current_registration_epoch) + .map(|pda| pda.authority) + .collect(); + + // Fetch all forester balances in one batch call using Rpc trait + let all_forester_pubkeys: Vec = active_forester_authorities + .iter() + .chain(registration_forester_authorities.iter()) + .cloned() + .collect(); + + let forester_balances = fetch_forester_balances(&rpc, &all_forester_pubkeys).await; + + // Build ForesterInfo with balances + let active_epoch_foresters: Vec = active_forester_authorities + .iter() + .map(|authority| { + let balance = forester_balances.get(authority).copied().flatten(); + ForesterInfo { + authority: authority.to_string(), + balance_sol: balance, + } + }) + .collect(); + + let registration_epoch_foresters: Vec = registration_forester_authorities + .iter() + .map(|authority| { + let balance = forester_balances.get(authority).copied().flatten(); + ForesterInfo { + authority: authority.to_string(), + balance_sol: balance, + } + }) + .collect(); + + // Phase 2: Fetch trees using existing optimized method + let mut trees = match fetch_trees(&rpc).await { + Ok(trees) => trees, + Err(e) => { + warn!("Failed to fetch trees: {:?}", e); + return Ok(ForesterStatus { + slot, + current_active_epoch, + current_registration_epoch, + active_epoch_progress, + active_phase_length, + active_epoch_progress_percentage, + hours_until_next_epoch, + slots_until_next_registration, + hours_until_next_registration, + active_epoch_foresters, + registration_epoch_foresters, + trees: vec![], + current_light_slot: None, + light_slot_length: protocol_config_pda.config.slot_length, + slots_until_next_light_slot: None, + total_light_slots: 0, + }); + } + }; + + // Filter trees by protocol group authority + if let Ok(group_authority) = fetch_protocol_group_authority(&rpc).await { + let before_count = trees.len(); + trees.retain(|tree| tree.owner == group_authority); + debug!( + "Filtered trees by group authority {}: {} -> {} trees", + group_authority, + before_count, + trees.len() + ); + } else { + warn!("Failed to fetch protocol group authority, showing all trees"); + } + + // Phase 3: Batch fetch all tree and queue accounts using Rpc trait + let mut tree_statuses = fetch_tree_statuses_batched(&rpc, &trees).await; + + // Phase 4: Compute light slot info and forester assignments + let light_slot_length = protocol_config_pda.config.slot_length; + let mut current_light_slot: Option = None; + let mut slots_until_next_light_slot: Option = None; + let mut total_light_slots: u64 = 0; + + let active_epoch_forester_pdas: Vec<&ForesterEpochPda> = forester_epoch_pdas + .iter() + .filter(|pda| pda.epoch == current_active_epoch) + .collect(); + + // Build authority -> index map for schedule + let authority_to_index: HashMap = active_epoch_foresters + .iter() + .enumerate() + .map(|(i, f)| (f.authority.clone(), i)) + .collect(); + + if !active_epoch_forester_pdas.is_empty() { + if let Some(total_epoch_weight) = active_epoch_forester_pdas + .first() + .and_then(|pda| pda.total_epoch_weight) + .filter(|&w| w > 0) + { + let epoch_phases = get_epoch_phases(&protocol_config_pda.config, current_active_epoch); + + if light_slot_length > 0 { + total_light_slots = epoch_phases.active.length() / light_slot_length; + + // Compute current light slot if in active phase + if slot >= epoch_phases.active.start && slot < epoch_phases.active.end { + let current_light_slot_index = + (slot - epoch_phases.active.start) / light_slot_length; + current_light_slot = Some(current_light_slot_index); + + // Calculate slots until next light slot + let next_light_slot_start = epoch_phases.active.start + + (current_light_slot_index + 1) * light_slot_length; + slots_until_next_light_slot = Some(next_light_slot_start.saturating_sub(slot)); + } + + // Build full schedule for each tree + for status in &mut tree_statuses { + let queue_pubkey: Pubkey = status.queue.parse().unwrap_or_default(); + let mut schedule: Vec> = + Vec::with_capacity(total_light_slots as usize); + + for light_slot_idx in 0..total_light_slots { + let forester_idx = ForesterEpochPda::get_eligible_forester_index( + light_slot_idx, + &queue_pubkey, + total_epoch_weight, + current_active_epoch, + ) + .ok() + .and_then(|eligible_idx| { + active_epoch_forester_pdas + .iter() + .find(|pda| pda.is_eligible(eligible_idx)) + .and_then(|pda| authority_to_index.get(&pda.authority.to_string())) + .copied() + }); + schedule.push(forester_idx); + } + + // Set current assigned forester + if let Some(current_idx) = current_light_slot { + if let Some(Some(forester_idx)) = schedule.get(current_idx as usize) { + status.assigned_forester = + Some(active_epoch_foresters[*forester_idx].authority.clone()); + } + } + + status.schedule = schedule; + } + } + } + } + + Ok(ForesterStatus { + slot, + current_active_epoch, + current_registration_epoch, + active_epoch_progress, + active_phase_length, + active_epoch_progress_percentage, + hours_until_next_epoch, + slots_until_next_registration, + hours_until_next_registration, + active_epoch_foresters, + registration_epoch_foresters, + trees: tree_statuses, + current_light_slot, + light_slot_length, + slots_until_next_light_slot, + total_light_slots, + }) +} + +async fn fetch_registry_accounts_filtered( + rpc: &R, +) -> crate::Result<(Vec, Vec, Vec)> { + let program_id = light_registry::ID; + + let (forester_result, epoch_result, config_result) = tokio::join!( + rpc.get_program_accounts_with_discriminator(&program_id, ForesterEpochPda::DISCRIMINATOR), + rpc.get_program_accounts_with_discriminator(&program_id, EpochPda::DISCRIMINATOR), + rpc.get_program_accounts_with_discriminator(&program_id, ProtocolConfigPda::DISCRIMINATOR), + ); + + let mut forester_epoch_pdas = Vec::new(); + let mut epoch_pdas = Vec::new(); + let mut protocol_config_pdas = Vec::new(); + + if let Ok(accounts) = forester_result { + for (_, account) in accounts { + let mut data: &[u8] = &account.data; + if let Ok(pda) = ForesterEpochPda::try_deserialize_unchecked(&mut data) { + forester_epoch_pdas.push(pda); + } + } + } + + if let Ok(accounts) = epoch_result { + for (_, account) in accounts { + let mut data: &[u8] = &account.data; + if let Ok(pda) = EpochPda::try_deserialize_unchecked(&mut data) { + epoch_pdas.push(pda); + } + } + } + + if let Ok(accounts) = config_result { + for (_, account) in accounts { + let mut data: &[u8] = &account.data; + if let Ok(pda) = ProtocolConfigPda::try_deserialize_unchecked(&mut data) { + protocol_config_pdas.push(pda); + } + } + } + + forester_epoch_pdas.sort_by(|a, b| a.epoch.cmp(&b.epoch)); + epoch_pdas.sort_by(|a, b| a.epoch.cmp(&b.epoch)); + + Ok((forester_epoch_pdas, epoch_pdas, protocol_config_pdas)) +} + +async fn fetch_tree_statuses_batched(rpc: &R, trees: &[TreeAccounts]) -> Vec { + if trees.is_empty() { + return vec![]; + } + + let mut pubkeys: Vec = Vec::with_capacity(trees.len() * 2); + let mut pubkey_map: Vec<(usize, &str)> = Vec::with_capacity(trees.len() * 2); + + for (i, tree) in trees.iter().enumerate() { + pubkeys.push(tree.merkle_tree); + pubkey_map.push((i, "merkle_tree")); + + if tree.tree_type != TreeType::AddressV2 { + pubkeys.push(tree.queue); + pubkey_map.push((i, "queue")); + } + } + + let accounts = match rpc.get_multiple_accounts(&pubkeys).await { + Ok(accounts) => accounts, + Err(e) => { + tracing::warn!("Failed to batch fetch accounts: {:?}", e); + return vec![]; + } + }; + + let mut tree_accounts: Vec<(Option, Option)> = + vec![(None, None); trees.len()]; + + for (idx, (tree_idx, account_type)) in pubkey_map.iter().enumerate() { + if let Some(Some(account)) = accounts.get(idx) { + match *account_type { + "merkle_tree" => tree_accounts[*tree_idx].0 = Some(account.clone()), + "queue" => tree_accounts[*tree_idx].1 = Some(account.clone()), + _ => {} + } + } + } + + let mut tree_statuses = Vec::with_capacity(trees.len()); + + for (i, tree) in trees.iter().enumerate() { + let (merkle_account, queue_account) = &tree_accounts[i]; + + match parse_tree_status(tree, merkle_account.clone(), queue_account.clone()) { + Ok(status) => tree_statuses.push(status), + Err(e) => { + tracing::warn!( + "Failed to parse tree status for {}: {:?}", + tree.merkle_tree, + e + ); + } + } + } + + tree_statuses +} + +async fn fetch_forester_balances( + rpc: &R, + pubkeys: &[Pubkey], +) -> HashMap> { + let mut balances = HashMap::new(); + + if pubkeys.is_empty() { + return balances; + } + + match rpc.get_multiple_accounts(pubkeys).await { + Ok(accounts) => { + for (i, account_opt) in accounts.iter().enumerate() { + if let Some(pubkey) = pubkeys.get(i) { + let balance = account_opt + .as_ref() + .map(|acc| acc.lamports as f64 / 1_000_000_000.0); + balances.insert(*pubkey, balance); + } + } + } + Err(e) => { + tracing::warn!("Failed to fetch forester balances: {:?}", e); + for pubkey in pubkeys { + balances.insert(*pubkey, None); + } + } + } + + balances +} + +fn parse_tree_status( + tree: &TreeAccounts, + merkle_account: Option, + queue_account: Option, +) -> crate::Result { + let mut merkle_account = + merkle_account.ok_or_else(|| anyhow::anyhow!("Merkle tree account not found"))?; + + let (fullness_percentage, next_index, threshold, queue_length, v2_queue_info) = match tree + .tree_type + { + TreeType::StateV1 => { + let tree_account = StateMerkleTreeAccount::deserialize(&mut &merkle_account.data[8..]) + .map_err(|e| anyhow::anyhow!("Failed to deserialize StateV1 metadata: {}", e))?; + + let height = 26u64; + let capacity = 1u64 << height; + let threshold_val = capacity + .saturating_mul(tree_account.metadata.rollover_metadata.rollover_threshold) + / 100; + + let merkle_tree = + parse_concurrent_merkle_tree_from_bytes::( + &merkle_account.data, + ) + .map_err(|e| anyhow::anyhow!("Failed to parse StateV1 tree: {:?}", e))?; + + let next_index = merkle_tree.next_index() as u64; + let fullness = next_index as f64 / capacity as f64 * 100.0; + + let queue_len = queue_account.and_then(|acc| { + unsafe { parse_hash_set_from_bytes::(&acc.data) } + .ok() + .map(|hs| { + hs.iter() + .filter(|(_, cell)| cell.sequence_number.is_none()) + .count() as u64 + }) + }); + + (fullness, next_index, threshold_val, queue_len, None) + } + TreeType::AddressV1 => { + let height = 26u64; + let capacity = 1u64 << height; + + let threshold_val = queue_account + .as_ref() + .and_then(|acc| QueueAccount::deserialize(&mut &acc.data[8..]).ok()) + .map(|q| { + capacity.saturating_mul(q.metadata.rollover_metadata.rollover_threshold) / 100 + }) + .unwrap_or(0); + + let merkle_tree = parse_indexed_merkle_tree_from_bytes::< + AddressMerkleTreeAccount, + Poseidon, + usize, + 26, + 16, + >(&merkle_account.data) + .map_err(|e| anyhow::anyhow!("Failed to parse AddressV1 tree: {:?}", e))?; + + let next_index = merkle_tree.next_index().saturating_sub(3) as u64; + let fullness = next_index as f64 / capacity as f64 * 100.0; + + let queue_len = queue_account.and_then(|acc| { + unsafe { parse_hash_set_from_bytes::(&acc.data) } + .ok() + .map(|hs| { + hs.iter() + .filter(|(_, cell)| cell.sequence_number.is_none()) + .count() as u64 + }) + }); + + (fullness, next_index, threshold_val, queue_len, None) + } + TreeType::StateV2 => { + let merkle_tree = BatchedMerkleTreeAccount::state_from_bytes( + &mut merkle_account.data, + &tree.merkle_tree.into(), + ) + .map_err(|e| anyhow::anyhow!("Failed to parse StateV2 tree: {:?}", e))?; + + let height = merkle_tree.height as u64; + let capacity = 1u64 << height; + let threshold_val = + (1u64 << height) * merkle_tree.metadata.rollover_metadata.rollover_threshold / 100; + let next_index = merkle_tree.next_index; + let fullness = next_index as f64 / capacity as f64 * 100.0; + + let v2_info = queue_account + .and_then(|mut acc| parse_state_v2_queue_info(&merkle_tree, &mut acc.data).ok()); + let queue_len = v2_info + .as_ref() + .map(|i| (i.input_pending_batches + i.output_pending_batches) * i.zkp_batch_size); + + (fullness, next_index, threshold_val, queue_len, v2_info) + } + TreeType::AddressV2 => { + let merkle_tree = BatchedMerkleTreeAccount::address_from_bytes( + &mut merkle_account.data, + &tree.merkle_tree.into(), + ) + .map_err(|e| anyhow::anyhow!("Failed to parse AddressV2 tree: {:?}", e))?; + + let height = merkle_tree.height as u64; + let capacity = 1u64 << height; + let threshold_val = + capacity * merkle_tree.metadata.rollover_metadata.rollover_threshold / 100; + let fullness = merkle_tree.next_index as f64 / capacity as f64 * 100.0; + + let v2_info = parse_address_v2_queue_info(&merkle_tree); + let queue_len = Some(v2_info.input_pending_batches * v2_info.zkp_batch_size); + + ( + fullness, + merkle_tree.next_index, + threshold_val, + queue_len, + Some(v2_info), + ) + } + TreeType::Unknown => (0.0, 0, 0, None, None), + }; + + Ok(TreeStatus { + tree_type: tree.tree_type.to_string(), + merkle_tree: tree.merkle_tree.to_string(), + queue: tree.queue.to_string(), + fullness_percentage, + next_index, + threshold, + is_rolledover: tree.is_rolledover, + queue_length, + v2_queue_info, + assigned_forester: None, + schedule: Vec::new(), + owner: tree.owner.to_string(), + }) +} + pub async fn fetch_forester_status(args: &StatusArgs) -> crate::Result<()> { let commitment_config = CommitmentConfig::confirmed(); diff --git a/forester/src/lib.rs b/forester/src/lib.rs index e44e07f36c..55c53e24da 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -1,5 +1,6 @@ pub type Result = anyhow::Result; +pub mod api_server; pub mod cli; pub mod compressible; pub mod config; @@ -22,7 +23,6 @@ pub mod utils; use std::{sync::Arc, time::Duration}; -use account_compression::utils::constants::{ADDRESS_QUEUE_VALUES, STATE_NULLIFIER_QUEUE_VALUES}; pub use config::{ForesterConfig, ForesterEpochInfo}; use forester_utils::{ forester_epoch::TreeAccounts, rate_limiter::RateLimiter, rpc_pool::SolanaRpcPoolBuilder, @@ -69,15 +69,9 @@ pub async fn run_queue_info( for tree_data in trees { match tree_data.tree_type { TreeType::StateV1 => { - let queue_length = fetch_queue_item_data( - &mut rpc, - &tree_data.queue, - 0, - STATE_NULLIFIER_QUEUE_VALUES, - STATE_NULLIFIER_QUEUE_VALUES, - ) - .await? - .len(); + let queue_length = fetch_queue_item_data(&mut rpc, &tree_data.queue, 0) + .await? + .len(); QUEUE_LENGTH .with_label_values(&[ &*queue_type.to_string(), @@ -91,15 +85,9 @@ pub async fn run_queue_info( ); } TreeType::AddressV1 => { - let queue_length = fetch_queue_item_data( - &mut rpc, - &tree_data.queue, - 0, - ADDRESS_QUEUE_VALUES, - ADDRESS_QUEUE_VALUES, - ) - .await? - .len(); + let queue_length = fetch_queue_item_data(&mut rpc, &tree_data.queue, 0) + .await? + .len(); QUEUE_LENGTH .with_label_values(&[ &*queue_type.to_string(), diff --git a/forester/src/main.rs b/forester/src/main.rs index 48bbb49300..3caf954c7e 100644 --- a/forester/src/main.rs +++ b/forester/src/main.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use clap::Parser; use forester::{ + api_server::spawn_api_server, cli::{Cli, Commands}, errors::ForesterError, forester_status, @@ -54,6 +55,7 @@ where #[tokio::main] #[allow(clippy::result_large_err)] async fn main() -> Result<(), ForesterError> { + dotenvy::dotenv().ok(); setup_telemetry(); let cli = Cli::parse(); @@ -108,6 +110,9 @@ async fn main() -> Result<(), ForesterError> { send_tx_limiter = Some(RateLimiter::new(rate_limit)); } + let rpc_url_for_api: String = config.external_services.rpc_url.to_string(); + spawn_api_server(rpc_url_for_api, args.api_server_port); + run_pipeline::( config, rpc_rate_limiter, diff --git a/forester/src/processor/v1/send_transaction.rs b/forester/src/processor/v1/send_transaction.rs index 555710c8d9..67d1de2fa8 100644 --- a/forester/src/processor/v1/send_transaction.rs +++ b/forester/src/processor/v1/send_transaction.rs @@ -6,7 +6,6 @@ use std::{ vec, }; -use account_compression::utils::constants::{ADDRESS_QUEUE_VALUES, STATE_NULLIFIER_QUEUE_VALUES}; use forester_utils::{forester_epoch::TreeAccounts, rpc_pool::SolanaRpcPool}; use futures::StreamExt; use light_client::rpc::Rpc; @@ -168,47 +167,32 @@ async fn prepare_batch_prerequisites( ) -> Result> { let tree_id_str = tree_accounts.merkle_tree.to_string(); - let (queue_total_capacity, queue_fetch_start_index, queue_fetch_length) = - match tree_accounts.tree_type { - TreeType::StateV1 => ( - STATE_NULLIFIER_QUEUE_VALUES, - config.queue_config.state_queue_start_index, - config.queue_config.state_queue_length, - ), - TreeType::AddressV1 => ( - ADDRESS_QUEUE_VALUES, - config.queue_config.address_queue_start_index, - config.queue_config.address_queue_length, - ), - _ => { - error!( - tree = %tree_id_str, - "prepare_batch_prerequisites called with unsupported tree type: {:?}", - tree_accounts.tree_type - ); - return Err(ForesterError::InvalidTreeType(tree_accounts.tree_type).into()); - } - }; + let queue_fetch_start_index = match tree_accounts.tree_type { + TreeType::StateV1 => config.queue_config.state_queue_start_index, + TreeType::AddressV1 => config.queue_config.address_queue_start_index, + _ => { + error!( + tree = %tree_id_str, + "prepare_batch_prerequisites called with unsupported tree type: {:?}", + tree_accounts.tree_type + ); + return Err(ForesterError::InvalidTreeType(tree_accounts.tree_type).into()); + } + }; let queue_item_data = { let mut rpc = pool.get_connection().await.map_err(|e| { error!(tree = %tree_id_str, "Failed to get RPC for queue data: {:?}", e); ForesterError::RpcPool(e) })?; - fetch_queue_item_data( - &mut *rpc, - &tree_accounts.queue, - queue_fetch_start_index, - queue_fetch_length, - queue_total_capacity, - ) - .await - .map_err(|e| { - error!(tree = %tree_id_str, "Failed to fetch queue item data: {:?}", e); - ForesterError::General { - error: format!("Fetch queue data failed for {}: {}", tree_id_str, e), - } - })? + fetch_queue_item_data(&mut *rpc, &tree_accounts.queue, queue_fetch_start_index) + .await + .map_err(|e| { + error!(tree = %tree_id_str, "Failed to fetch queue item data: {:?}", e); + ForesterError::General { + error: format!("Fetch queue data failed for {}: {}", tree_id_str, e), + } + })? }; if queue_item_data.is_empty() { diff --git a/forester/src/queue_helpers.rs b/forester/src/queue_helpers.rs index aad6b5984a..599269bc59 100644 --- a/forester/src/queue_helpers.rs +++ b/forester/src/queue_helpers.rs @@ -1,16 +1,143 @@ use account_compression::QueueAccount; use light_batched_merkle_tree::{ + batch::BatchState, constants::{DEFAULT_ADDRESS_ZKP_BATCH_SIZE, DEFAULT_ZKP_BATCH_SIZE}, merkle_tree::BatchedMerkleTreeAccount, queue::BatchedQueueAccount, }; use light_client::rpc::Rpc; use light_hash_set::HashSet; +use serde::{Deserialize, Serialize}; use solana_sdk::pubkey::Pubkey; use tracing::trace; use crate::Result; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct V2QueueInfo { + pub next_index: u64, + pub pending_batch_index: u64, + pub zkp_batch_size: u64, + pub batches: Vec, + pub input_pending_batches: u64, + pub output_pending_batches: u64, + pub input_items_in_current_zkp_batch: u64, + pub output_items_in_current_zkp_batch: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BatchInfo { + pub batch_index: usize, + pub state: String, + pub num_inserted: u64, + pub current_index: u64, + pub pending: u64, + /// Items accumulated in the current partially-filled ZKP batch (0 to zkp_batch_size-1) + pub items_in_current_zkp_batch: u64, +} + +pub fn parse_state_v2_queue_info( + merkle_tree: &BatchedMerkleTreeAccount, + output_queue_data: &mut [u8], +) -> crate::Result { + let output_queue = BatchedQueueAccount::output_from_bytes(output_queue_data) + .map_err(|e| anyhow::anyhow!("Failed to parse StateV2 output queue: {:?}", e))?; + + let next_index = output_queue.batch_metadata.next_index; + let mut zkp_batch_size = DEFAULT_ZKP_BATCH_SIZE; + let mut output_pending_batches = 0u64; + let mut batch_infos = Vec::new(); + + for (batch_idx, batch) in output_queue.batch_metadata.batches.iter().enumerate() { + zkp_batch_size = batch.zkp_batch_size; + let num_inserted = batch.get_num_inserted_zkps(); + let current_index = batch.get_current_zkp_batch_index(); + let pending_in_batch = current_index.saturating_sub(num_inserted); + + batch_infos.push(BatchInfo { + batch_index: batch_idx, + state: format!("{:?}", batch.get_state()), + num_inserted, + current_index, + pending: pending_in_batch, + items_in_current_zkp_batch: batch.get_num_inserted_zkp_batch(), + }); + + output_pending_batches += pending_in_batch; + } + + let mut input_pending_batches = 0u64; + let mut input_items_in_current_zkp_batch = 0u64; + for batch in merkle_tree.queue_batches.batches.iter() { + let num_inserted = batch.get_num_inserted_zkps(); + let current_index = batch.get_current_zkp_batch_index(); + let pending_in_batch = current_index.saturating_sub(num_inserted); + input_pending_batches += pending_in_batch; + + if batch.get_state() == BatchState::Fill { + input_items_in_current_zkp_batch = batch.get_num_inserted_zkp_batch(); + } + } + + let output_items_in_current_zkp_batch = batch_infos + .iter() + .find(|b| b.state == "Fill") + .map(|b| b.items_in_current_zkp_batch) + .unwrap_or(0); + + Ok(V2QueueInfo { + next_index, + pending_batch_index: output_queue.batch_metadata.pending_batch_index, + zkp_batch_size, + batches: batch_infos, + input_pending_batches, + output_pending_batches, + input_items_in_current_zkp_batch, + output_items_in_current_zkp_batch, + }) +} + +pub fn parse_address_v2_queue_info(merkle_tree: &BatchedMerkleTreeAccount) -> V2QueueInfo { + let next_index = merkle_tree.queue_batches.next_index; + let mut zkp_batch_size = DEFAULT_ADDRESS_ZKP_BATCH_SIZE; + let mut pending_batches = 0u64; + let mut batch_infos = Vec::new(); + let mut input_items_in_current_zkp_batch = 0u64; + + for (batch_idx, batch) in merkle_tree.queue_batches.batches.iter().enumerate() { + zkp_batch_size = batch.zkp_batch_size; + let num_inserted = batch.get_num_inserted_zkps(); + let current_index = batch.get_current_zkp_batch_index(); + let pending_in_batch = current_index.saturating_sub(num_inserted); + + if batch.get_state() == BatchState::Fill { + input_items_in_current_zkp_batch = batch.get_num_inserted_zkp_batch(); + } + + batch_infos.push(BatchInfo { + batch_index: batch_idx, + state: format!("{:?}", batch.get_state()), + num_inserted, + current_index, + pending: pending_in_batch, + items_in_current_zkp_batch: batch.get_num_inserted_zkp_batch(), + }); + + pending_batches += pending_in_batch; + } + + V2QueueInfo { + next_index, + pending_batch_index: merkle_tree.queue_batches.pending_batch_index, + zkp_batch_size, + batches: batch_infos, + input_pending_batches: pending_batches, + output_pending_batches: 0, + input_items_in_current_zkp_batch, + output_items_in_current_zkp_batch: 0, + } +} + #[derive(Debug, Clone)] pub struct QueueItemData { pub hash: [u8; 32], @@ -21,8 +148,6 @@ pub async fn fetch_queue_item_data( rpc: &mut R, queue_pubkey: &Pubkey, start_index: u16, - processing_length: u16, - queue_length: u16, ) -> Result> { trace!("Fetching queue data for {:?}", queue_pubkey); let account = rpc.get_account(*queue_pubkey).await?; @@ -47,20 +172,38 @@ pub async fn fetch_queue_item_data( return Ok(Vec::new()); } let queue: HashSet = unsafe { HashSet::from_bytes_copy(&mut account.data[offset..])? }; - let end_index = (start_index + processing_length).min(queue_length); - let filtered_queue = queue + let end_index = queue.get_capacity(); + + let all_items: Vec<(usize, [u8; 32], bool)> = queue .iter() - .filter(|(index, cell)| { - *index >= start_index as usize - && *index < end_index as usize - && cell.sequence_number.is_none() - }) - .map(|(index, cell)| QueueItemData { - hash: cell.value_bytes(), - index, + .map(|(index, cell)| (index, cell.value_bytes(), cell.sequence_number.is_none())) + .collect(); + + let total_items = all_items.len(); + let total_pending = all_items + .iter() + .filter(|(_, _, is_pending)| *is_pending) + .count(); + + let filtered_queue: Vec = all_items + .into_iter() + .filter(|(index, _, is_pending)| { + *index >= start_index as usize && *index < end_index && *is_pending }) + .map(|(index, hash, _)| QueueItemData { hash, index }) .collect(); + + tracing::info!( + "Queue {}: total_items={}, total_pending={}, range={}..{}, filtered_result={}", + queue_pubkey, + total_items, + total_pending, + start_index, + end_index, + filtered_queue.len() + ); + Ok(filtered_queue) } @@ -268,3 +411,70 @@ pub struct QueueUpdate { pub pubkey: Pubkey, pub slot: u64, } + +pub async fn get_address_v2_queue_info( + rpc: &mut R, + merkle_tree_pubkey: &Pubkey, +) -> Result { + if let Some(mut account) = rpc.get_account(*merkle_tree_pubkey).await? { + let merkle_tree = BatchedMerkleTreeAccount::address_from_bytes( + account.data.as_mut_slice(), + &(*merkle_tree_pubkey).into(), + )?; + Ok(parse_address_v2_queue_info(&merkle_tree)) + } else { + Err(anyhow::anyhow!("account not found")) + } +} + +pub async fn get_state_v2_output_queue_info( + rpc: &mut R, + queue_pubkey: &Pubkey, +) -> Result { + if let Some(mut account) = rpc.get_account(*queue_pubkey).await? { + let queue = BatchedQueueAccount::output_from_bytes(account.data.as_mut_slice())?; + let next_index = queue.batch_metadata.next_index; + + let mut zkp_batch_size = DEFAULT_ZKP_BATCH_SIZE; + let mut total_unprocessed = 0; + let mut batch_infos = Vec::new(); + let mut output_items_in_current_zkp_batch = 0u64; + + for (batch_idx, batch) in queue.batch_metadata.batches.iter().enumerate() { + zkp_batch_size = batch.zkp_batch_size; + let num_inserted = batch.get_num_inserted_zkps(); + let current_index = batch.get_current_zkp_batch_index(); + let pending_in_batch = current_index.saturating_sub(num_inserted); + + if batch.get_state() == BatchState::Fill { + output_items_in_current_zkp_batch = batch.get_num_inserted_zkp_batch(); + } + + batch_infos.push(BatchInfo { + batch_index: batch_idx, + state: format!("{:?}", batch.get_state()), + num_inserted, + current_index, + pending: pending_in_batch, + items_in_current_zkp_batch: batch.get_num_inserted_zkp_batch(), + }); + + total_unprocessed += pending_in_batch; + } + + let pending_batches = total_unprocessed; + + Ok(V2QueueInfo { + next_index, + pending_batch_index: queue.batch_metadata.pending_batch_index, + zkp_batch_size, + batches: batch_infos, + input_pending_batches: 0, + output_pending_batches: pending_batches, + input_items_in_current_zkp_batch: 0, + output_items_in_current_zkp_batch, + }) + } else { + Err(anyhow::anyhow!("account not found")) + } +} diff --git a/forester/src/tree_data_sync.rs b/forester/src/tree_data_sync.rs index 3f0f2bd3ce..b852e01e59 100644 --- a/forester/src/tree_data_sync.rs +++ b/forester/src/tree_data_sync.rs @@ -1,5 +1,5 @@ use account_compression::{ - utils::check_discriminator::check_discriminator, AddressMerkleTreeAccount, + utils::check_discriminator::check_discriminator, AddressMerkleTreeAccount, RegisteredProgram, StateMerkleTreeAccount, }; use borsh::BorshDeserialize; @@ -10,7 +10,7 @@ use light_compressed_account::TreeType; use light_merkle_tree_metadata::merkle_tree::MerkleTreeMetadata; use serde_json::json; use solana_sdk::{account::Account, pubkey::Pubkey}; -use tracing::{debug, trace, warn}; +use tracing::{debug, info, trace, warn}; use crate::{errors::AccountDeserializationError, Result}; @@ -302,14 +302,48 @@ fn create_tree_accounts( metadata.associated_queue.into(), tree_type, metadata.rollover_metadata.rolledover_slot != u64::MAX, + metadata.access_metadata.owner.into(), ); trace!( - "{:?} Merkle Tree account found. Pubkey: {}. Queue pubkey: {}. Rolledover: {}", + "{:?} Merkle Tree account found. Pubkey: {}. Queue pubkey: {}. Rolledover: {}. Owner: {}", tree_type, pubkey, tree_accounts.queue, - tree_accounts.is_rolledover + tree_accounts.is_rolledover, + tree_accounts.owner ); tree_accounts } + +pub fn get_registered_program_pda(program_id: &Pubkey) -> Pubkey { + Pubkey::find_program_address( + &[program_id.to_bytes().as_slice()], + &account_compression::ID, + ) + .0 +} + +pub async fn fetch_protocol_group_authority(rpc: &R) -> Result { + let registered_program_pda = get_registered_program_pda(&light_registry::ID); + + let account = rpc + .get_account(registered_program_pda) + .await? + .ok_or_else(|| { + anyhow::anyhow!( + "RegisteredProgram PDA not found for light_registry at {}", + registered_program_pda + ) + })?; + + let registered_program = RegisteredProgram::deserialize(&mut &account.data[8..]) + .map_err(|e| anyhow::anyhow!("Failed to deserialize RegisteredProgram: {}", e))?; + + info!( + "Fetched protocol group authority: {}", + registered_program.group_authority_pda + ); + + Ok(registered_program.group_authority_pda) +} diff --git a/forester/static/dashboard.html b/forester/static/dashboard.html new file mode 100644 index 0000000000..0a1ac7ad98 --- /dev/null +++ b/forester/static/dashboard.html @@ -0,0 +1,1219 @@ + + + + + + Forester Dashboard + + + +
+
+

Forester Dashboard

+
+ +
+ + Loading +
+
+
+ +
+
+
+
+ Loading status... +
+
+
+ + + + diff --git a/forester/test.sh b/forester/test.sh deleted file mode 100644 index fed0b252e5..0000000000 --- a/forester/test.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/bash - -../cli/test_bin/run start-prover --run-mode forester -../cli/test_bin/run test-validator --skip-prover --skip-indexer -sleep 10 -(cd ../../photon && cargo run 2>&1 > photon.log) - -sleep 60 * 5 - -RUST_LOG=forester=debug,forester_utils=debug cargo test --package forester test_state_indexer_async_batched -- --nocapture diff --git a/forester/tests/e2e_test.rs b/forester/tests/e2e_test.rs index 31c950c022..185ab2ed3b 100644 --- a/forester/tests/e2e_test.rs +++ b/forester/tests/e2e_test.rs @@ -241,6 +241,7 @@ async fn e2e_test() { sleep_after_processing_ms: 50, sleep_when_idle_ms: 100, queue_polling_mode: Default::default(), + group_authority: None, }, rpc_pool_config: RpcPoolConfig { max_size: 50, diff --git a/forester/tests/priority_fee_test.rs b/forester/tests/priority_fee_test.rs index 96475c429e..1c1b5e68eb 100644 --- a/forester/tests/priority_fee_test.rs +++ b/forester/tests/priority_fee_test.rs @@ -89,6 +89,8 @@ async fn test_priority_fee_request() { tree_ids: vec![], enable_compressible: true, lookup_table_address: None, + api_server_port: 8080, + group_authority: None, }; let config = ForesterConfig::new_for_start(&args).expect("Failed to create config"); diff --git a/forester/tests/test_utils.rs b/forester/tests/test_utils.rs index 5c1ce7a4b7..1ae2f159d7 100644 --- a/forester/tests/test_utils.rs +++ b/forester/tests/test_utils.rs @@ -115,6 +115,7 @@ pub fn forester_config() -> ForesterConfig { sleep_after_processing_ms: 50, sleep_when_idle_ms: 100, queue_polling_mode: QueuePollingMode::OnChain, + group_authority: None, }, rpc_pool_config: RpcPoolConfig { max_size: 50, diff --git a/program-tests/registry-test/tests/tests.rs b/program-tests/registry-test/tests/tests.rs index fd659a5302..b74a64ae88 100644 --- a/program-tests/registry-test/tests/tests.rs +++ b/program-tests/registry-test/tests/tests.rs @@ -876,12 +876,14 @@ async fn test_register_and_update_forester_pda() { merkle_tree: env.v1_state_trees[0].merkle_tree, queue: env.v1_state_trees[0].nullifier_queue, is_rolledover: false, + owner: Default::default(), }, TreeAccounts { tree_type: TreeType::AddressV1, merkle_tree: env.v1_address_trees[0].merkle_tree, queue: env.v1_address_trees[0].queue, is_rolledover: false, + owner: Default::default(), }, ]; diff --git a/program-tests/utils/src/e2e_test_env.rs b/program-tests/utils/src/e2e_test_env.rs index 6aefb38f94..5692eaf5e2 100644 --- a/program-tests/utils/src/e2e_test_env.rs +++ b/program-tests/utils/src/e2e_test_env.rs @@ -1129,6 +1129,7 @@ where merkle_tree: state_merkle_tree_bundle.accounts.merkle_tree, queue: state_merkle_tree_bundle.accounts.nullifier_queue, is_rolledover: false, + owner: Default::default(), } }) .collect::>(); @@ -1139,6 +1140,7 @@ where merkle_tree: address_merkle_tree_bundle.accounts.merkle_tree, queue: address_merkle_tree_bundle.accounts.queue, is_rolledover: false, + owner: Default::default(), }); }, ); diff --git a/program-tests/utils/src/setup_forester.rs b/program-tests/utils/src/setup_forester.rs index b82698212d..78d9e9e939 100644 --- a/program-tests/utils/src/setup_forester.rs +++ b/program-tests/utils/src/setup_forester.rs @@ -56,12 +56,14 @@ pub async fn setup_forester_and_advance_to_epoch( test_keypairs.nullifier_queue.pubkey(), TreeType::StateV1, false, + Default::default(), ), TreeAccounts::new( test_keypairs.address_merkle_tree.pubkey(), test_keypairs.address_merkle_tree_queue.pubkey(), TreeType::AddressV1, false, + Default::default(), ), ]; diff --git a/sdk-libs/client/src/rpc/client.rs b/sdk-libs/client/src/rpc/client.rs index 8a3226c72e..09dabfa7cb 100644 --- a/sdk-libs/client/src/rpc/client.rs +++ b/sdk-libs/client/src/rpc/client.rs @@ -466,6 +466,37 @@ impl Rpc for LightClient { .await } + async fn get_program_accounts_with_discriminator( + &self, + program_id: &Pubkey, + discriminator: &[u8], + ) -> Result, RpcError> { + use solana_rpc_client_api::{ + config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, + filter::{Memcmp, RpcFilterType}, + }; + + let discriminator = discriminator.to_vec(); + self.retry(|| async { + let config = RpcProgramAccountsConfig { + filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_base58_encoded( + 0, + &discriminator, + ))]), + account_config: RpcAccountInfoConfig { + encoding: Some(solana_account_decoder_client_types::UiAccountEncoding::Base64), + commitment: Some(self.client.commitment()), + ..Default::default() + }, + ..Default::default() + }; + self.client + .get_program_accounts_with_config(program_id, config) + .map_err(RpcError::from) + }) + .await + } + async fn process_transaction( &mut self, transaction: Transaction, diff --git a/sdk-libs/client/src/rpc/rpc_trait.rs b/sdk-libs/client/src/rpc/rpc_trait.rs index 2ece7386fd..104c32d51e 100644 --- a/sdk-libs/client/src/rpc/rpc_trait.rs +++ b/sdk-libs/client/src/rpc/rpc_trait.rs @@ -97,6 +97,13 @@ pub trait Rpc: Send + Sync + Debug + 'static { &self, program_id: &Pubkey, ) -> Result, RpcError>; + + async fn get_program_accounts_with_discriminator( + &self, + program_id: &Pubkey, + discriminator: &[u8], + ) -> Result, RpcError>; + // TODO: add send transaction with config async fn confirm_transaction(&self, signature: Signature) -> Result; diff --git a/sdk-libs/program-test/src/program_test/rpc.rs b/sdk-libs/program-test/src/program_test/rpc.rs index 3172d7f829..f87e89703f 100644 --- a/sdk-libs/program-test/src/program_test/rpc.rs +++ b/sdk-libs/program-test/src/program_test/rpc.rs @@ -63,6 +63,21 @@ impl Rpc for LightProgramTest { Ok(self.context.get_program_accounts(program_id)) } + async fn get_program_accounts_with_discriminator( + &self, + program_id: &Pubkey, + discriminator: &[u8], + ) -> Result, RpcError> { + let all_accounts = self.context.get_program_accounts(program_id); + Ok(all_accounts + .into_iter() + .filter(|(_, account)| { + account.data.len() >= discriminator.len() + && &account.data[..discriminator.len()] == discriminator + }) + .collect()) + } + async fn confirm_transaction(&self, _transaction: Signature) -> Result { Ok(true) } From f35d96caf6dffcc9098676c7c0b2b348320bb8c0 Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Wed, 7 Jan 2026 11:32:55 +0000 Subject: [PATCH 02/15] format --- Cargo.lock | 178 ++--------------------------------------------------- 1 file changed, 5 insertions(+), 173 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index df88d28c3b..25190c61aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -865,28 +865,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "async-stream" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" -dependencies = [ - "async-stream-impl", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.111", -] - [[package]] name = "async-trait" version = "0.1.89" @@ -2218,12 +2196,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "fallible-iterator" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" - [[package]] name = "fastbloom" version = "0.14.0" @@ -2395,6 +2367,7 @@ dependencies = [ "serial_test", "solana-account-decoder", "solana-client", + "solana-commitment-config", "solana-program", "solana-pubkey 2.4.0", "solana-rpc-client-api", @@ -2414,12 +2387,8 @@ version = "2.0.0" dependencies = [ "account-compression", "anchor-lang", - "anyhow", - "async-stream", "async-trait", "bb8", - "bs58", - "futures", "governor 0.8.1", "light-account-checks", "light-batched-merkle-tree", @@ -2428,25 +2397,19 @@ dependencies = [ "light-concurrent-merkle-tree", "light-hash-set", "light-hasher", - "light-indexed-array", "light-indexed-merkle-tree", - "light-merkle-tree-metadata", "light-merkle-tree-reference", "light-prover-client", "light-registry", "light-sdk", "light-sparse-merkle-tree", "light-token-interface", - "num-bigint 0.4.6", "num-traits", - "serde", - "serde_json", "solana-instruction", "solana-pubkey 2.4.0", "solana-sdk", "thiserror 2.0.17", "tokio", - "tokio-postgres", "tracing", ] @@ -2989,6 +2952,7 @@ dependencies = [ "http 1.4.0", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "pin-utils", @@ -3450,7 +3414,6 @@ checksum = "df15f6eac291ed1cf25865b1ee60399f57e7c227e7f51bdbd4c5270396a9ed50" dependencies = [ "bitflags 2.10.0", "libc", - "redox_syscall 0.6.0", ] [[package]] @@ -4444,16 +4407,6 @@ dependencies = [ "regex-automata", ] -[[package]] -name = "md-5" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" -dependencies = [ - "cfg-if", - "digest 0.10.7", -] - [[package]] name = "memchr" version = "2.7.6" @@ -4902,7 +4855,7 @@ checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.5.18", + "redox_syscall", "smallvec", "windows-link", ] @@ -4955,25 +4908,6 @@ dependencies = [ "num", ] -[[package]] -name = "phf" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1562dc717473dbaa4c1f85a36410e03c047b2e7df7f45ee938fbef64ae7fadf" -dependencies = [ - "phf_shared", - "serde", -] - -[[package]] -name = "phf_shared" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e57fef6bc5981e38c2ce2d63bfa546861309f875b8a75f092d1d54ae2d64f266" -dependencies = [ - "siphasher 1.0.1", -] - [[package]] name = "photon-api" version = "0.53.0" @@ -5121,35 +5055,6 @@ dependencies = [ "portable-atomic", ] -[[package]] -name = "postgres-protocol" -version = "0.6.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbef655056b916eb868048276cfd5d6a7dea4f81560dfd047f97c8c6fe3fcfd4" -dependencies = [ - "base64 0.22.1", - "byteorder", - "bytes", - "fallible-iterator", - "hmac 0.12.1", - "md-5", - "memchr", - "rand 0.9.2", - "sha2 0.10.9", - "stringprep", -] - -[[package]] -name = "postgres-types" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef4605b7c057056dd35baeb6ac0c0338e4975b1f2bef0f65da953285eb007095" -dependencies = [ - "bytes", - "fallible-iterator", - "postgres-protocol", -] - [[package]] name = "potential_utf" version = "0.1.4" @@ -5525,15 +5430,6 @@ dependencies = [ "bitflags 2.10.0", ] -[[package]] -name = "redox_syscall" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec96166dafa0886eb81fe1c0a388bece180fbef2135f97c1e2cf8302e74b43b5" -dependencies = [ - "bitflags 2.10.0", -] - [[package]] name = "redox_users" version = "0.4.6" @@ -10055,17 +9951,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" -[[package]] -name = "stringprep" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" -dependencies = [ - "unicode-bidi", - "unicode-normalization", - "unicode-properties", -] - [[package]] name = "strsim" version = "0.8.0" @@ -10549,32 +10434,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-postgres" -version = "0.7.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b40d66d9b2cfe04b628173409368e58247e8eddbbd3b0e6c6ba1d09f20f6c9e" -dependencies = [ - "async-trait", - "byteorder", - "bytes", - "fallible-iterator", - "futures-channel", - "futures-util", - "log", - "parking_lot", - "percent-encoding", - "phf", - "pin-project-lite", - "postgres-protocol", - "postgres-types", - "rand 0.9.2", - "socket2 0.6.1", - "tokio", - "tokio-util 0.7.17", - "whoami", -] - [[package]] name = "tokio-rustls" version = "0.24.1" @@ -10970,12 +10829,6 @@ version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" -[[package]] -name = "unicode-bidi" -version = "0.3.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" - [[package]] name = "unicode-ident" version = "1.0.22" @@ -10991,12 +10844,6 @@ dependencies = [ "tinyvec", ] -[[package]] -name = "unicode-properties" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7df058c713841ad818f1dc5d3fd88063241cc61f49f5fbea4b951e8cf5a8d71d" - [[package]] name = "unicode-segmentation" version = "1.12.0" @@ -11165,6 +11012,8 @@ dependencies = [ "http 1.4.0", "http-body 1.0.1", "http-body-util", + "hyper 1.8.1", + "hyper-util", "log", "mime", "mime_guess", @@ -11201,12 +11050,6 @@ dependencies = [ "wit-bindgen", ] -[[package]] -name = "wasite" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" - [[package]] name = "wasm-bindgen" version = "0.2.106" @@ -11318,17 +11161,6 @@ dependencies = [ "rustls-pki-types", ] -[[package]] -name = "whoami" -version = "1.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d4a4db5077702ca3015d3d02d74974948aba2ad9e12ab7df718ee64ccd7e97d" -dependencies = [ - "libredox", - "wasite", - "web-sys", -] - [[package]] name = "winapi" version = "0.3.9" From 1528f1b9de61d39515c6ec01606f1e61c70d276a Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Wed, 7 Jan 2026 11:40:42 +0000 Subject: [PATCH 03/15] import CommitmentLevel from solana_commitment_config instead of solana_sdk --- forester/src/processor/v1/send_transaction.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/forester/src/processor/v1/send_transaction.rs b/forester/src/processor/v1/send_transaction.rs index 67d1de2fa8..22b7bcd913 100644 --- a/forester/src/processor/v1/send_transaction.rs +++ b/forester/src/processor/v1/send_transaction.rs @@ -13,8 +13,8 @@ use light_compressed_account::TreeType; use light_registry::utils::get_forester_epoch_pda_from_authority; use reqwest::Url; use solana_client::rpc_config::RpcSendTransactionConfig; +use solana_commitment_config::CommitmentLevel; use solana_sdk::{ - commitment_config::CommitmentLevel, hash::Hash, pubkey::Pubkey, signature::{Keypair, Signature, Signer}, From cb725c3b8e351b10d2a55739fdfdd6e07a013508 Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Wed, 7 Jan 2026 12:08:45 +0000 Subject: [PATCH 04/15] create static directory in forester's Dockerfile and copy static files --- forester/Dockerfile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/forester/Dockerfile b/forester/Dockerfile index 462de1ecb4..79b3f06f36 100644 --- a/forester/Dockerfile +++ b/forester/Dockerfile @@ -14,8 +14,9 @@ RUN cargo build --release --package forester FROM debian:bookworm-slim RUN apt-get update && apt-get install -y ca-certificates libssl3 && rm -rf /var/lib/apt/lists/* -RUN mkdir -p /app/config +RUN mkdir -p /app/config /app/static COPY --from=builder /app/target/release/forester /usr/local/bin/forester +COPY --from=builder /app/forester/static /app/static WORKDIR /app ENTRYPOINT ["/usr/local/bin/forester"] From 3f9c38849dc55cf4e8a1fd46d8926ef3e0e4ea9c Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Fri, 9 Jan 2026 13:52:02 +0000 Subject: [PATCH 05/15] feat: enhance epoch monitoring and metrics tracking; improve error handling and logging --- forester/src/epoch_manager.rs | 126 ++++++++++--- forester/src/metrics.rs | 47 +++++ forester/src/processor/v1/helpers.rs | 170 +++++++++++++----- forester/src/processor/v1/send_transaction.rs | 18 +- forester/src/processor/v2/processor.rs | 3 +- forester/src/processor/v2/tx_sender.rs | 33 +++- 6 files changed, 320 insertions(+), 77 deletions(-) diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 12abd9b5d3..34211c3b16 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -282,7 +282,7 @@ impl EpochManager { let (tx, mut rx) = mpsc::channel(100); let tx = Arc::new(tx); - let monitor_handle = tokio::spawn({ + let mut monitor_handle = tokio::spawn({ let self_clone = Arc::clone(&self); let tx_clone = Arc::clone(&tx); async move { self_clone.monitor_epochs(tx_clone).await } @@ -311,32 +311,63 @@ impl EpochManager { let _guard = scopeguard::guard( ( - monitor_handle, current_previous_handle, new_tree_handle, balance_check_handle, ), - |(h1, h2, h3, h4)| { + |(h2, h3, h4)| { info!("Aborting EpochManager background tasks"); - h1.abort(); h2.abort(); h3.abort(); h4.abort(); }, ); - while let Some(epoch) = rx.recv().await { - debug!("Received new epoch: {}", epoch); - - let self_clone = Arc::clone(&self); - tokio::spawn(async move { - if let Err(e) = self_clone.process_epoch(epoch).await { - error!("Error processing epoch {}: {:?}", epoch, e); + loop { + tokio::select! { + epoch_opt = rx.recv() => { + match epoch_opt { + Some(epoch) => { + debug!("Received new epoch: {}", epoch); + let self_clone = Arc::clone(&self); + tokio::spawn(async move { + if let Err(e) = self_clone.process_epoch(epoch).await { + error!("Error processing epoch {}: {:?}", epoch, e); + } + }); + } + None => { + error!("Epoch monitor channel closed unexpectedly!"); + break; + } + } } - }); + result = &mut monitor_handle => { + match result { + Ok(Ok(())) => { + error!("Epoch monitor exited unexpectedly with Ok(())"); + } + Ok(Err(e)) => { + error!("Epoch monitor exited with error: {:?}", e); + } + Err(e) => { + error!("Epoch monitor task panicked or was cancelled: {:?}", e); + } + } + if let Some(pagerduty_key) = &self.config.external_services.pagerduty_routing_key { + let _ = send_pagerduty_alert( + pagerduty_key, + "critical", + &format!("Forester epoch monitor died unexpectedly on {}", self.config.payer_keypair.pubkey()), + "epoch_monitor_dead", + ).await; + } + return Err(anyhow!("Epoch monitor exited unexpectedly - forester cannot function without it")); + } + } } - Ok(()) + Err(anyhow!("Epoch monitor channel closed - forester cannot function without it")) } async fn check_sol_balance_periodically(self: Arc) -> Result<()> { @@ -461,10 +492,47 @@ impl EpochManager { #[instrument(level = "debug", skip(self, tx))] async fn monitor_epochs(&self, tx: Arc>) -> Result<()> { let mut last_epoch: Option = None; - debug!("Starting epoch monitor"); + let mut consecutive_failures = 0u32; + const MAX_BACKOFF_SECS: u64 = 60; + + info!("Starting epoch monitor"); loop { - let (slot, current_epoch) = self.get_current_slot_and_epoch().await?; + let (slot, current_epoch) = match self.get_current_slot_and_epoch().await { + Ok(result) => { + if consecutive_failures > 0 { + info!( + "Epoch monitor recovered after {} consecutive failures", + consecutive_failures + ); + } + consecutive_failures = 0; + result + } + Err(e) => { + consecutive_failures += 1; + let backoff_secs = 2u64.pow(consecutive_failures.min(6)).min(MAX_BACKOFF_SECS); + let backoff = Duration::from_secs(backoff_secs); + + if consecutive_failures == 1 { + warn!( + "Epoch monitor: failed to get slot/epoch: {:?}. Retrying in {:?}", + e, backoff + ); + } else { + if consecutive_failures % 10 == 0 { + error!( + "Epoch monitor: {} consecutive failures, last error: {:?}. Still retrying every {:?}", + consecutive_failures, e, backoff + ); + } + } + + tokio::time::sleep(backoff).await; + continue; + } + }; + debug!( "last_epoch: {:?}, current_epoch: {:?}, slot: {:?}", last_epoch, current_epoch, slot @@ -477,10 +545,10 @@ impl EpochManager { debug!("Sending current epoch {} for processing", current_epoch); if let Err(e) = tx.send(current_epoch).await { error!( - "Failed to send current epoch {} for processing: {:?}", + "Failed to send current epoch {} for processing: {:?}. Channel closed, exiting.", current_epoch, e ); - return Ok(()); + return Err(anyhow!("Epoch channel closed: {}", e)); } last_epoch = Some(current_epoch); } @@ -612,10 +680,12 @@ impl EpochManager { let forester_epoch_pda_pubkey = get_forester_epoch_pda_from_authority(&self.config.derivation_pubkey, epoch).0; - let rpc = self.rpc_pool.get_connection().await?; - let existing_pda = rpc - .get_anchor_account::(&forester_epoch_pda_pubkey) - .await?; + + let existing_pda = { + let rpc = self.rpc_pool.get_connection().await?; + rpc.get_anchor_account::(&forester_epoch_pda_pubkey) + .await? + }; existing_pda .map(|pda| async move { @@ -698,12 +768,15 @@ impl EpochManager { #[instrument(level = "debug", skip(self), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch ))] async fn process_epoch(&self, epoch: u64) -> Result<()> { - info!("Entering process_epoch"); - + + // Clone the Arc immediately to release the DashMap shard lock. + // Without .clone(), the RefMut guard would be held across async operations, + // blocking other epochs from accessing the DashMap if they hash to the same shard. let processing_flag = self .processing_epochs .entry(epoch) - .or_insert_with(|| Arc::new(AtomicBool::new(false))); + .or_insert_with(|| Arc::new(AtomicBool::new(false))) + .clone(); if processing_flag .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) @@ -713,6 +786,7 @@ impl EpochManager { debug!("Epoch {} is already being processed, skipping", epoch); return Ok(()); } + let phases = get_epoch_phases(&self.protocol_config, epoch); // Attempt to recover registration info @@ -1214,7 +1288,9 @@ impl EpochManager { handles.push(handle); } - for result in join_all(handles).await { + info!("Waiting for {} tree processing tasks", handles.len()); + let results = join_all(handles).await; + for result in results { match result { Ok(Ok(())) => { debug!("Queue processed successfully"); diff --git a/forester/src/metrics.rs b/forester/src/metrics.rs index 9b01c33cee..9429e45467 100644 --- a/forester/src/metrics.rs +++ b/forester/src/metrics.rs @@ -81,6 +81,28 @@ lazy_static! { error!("Failed to create metric REGISTERED_FORESTERS: {:?}", e); std::process::exit(1); }); + pub static ref INDEXER_RESPONSE_TIME: GaugeVec = GaugeVec::new( + prometheus::opts!( + "forester_indexer_response_time_seconds", + "Response time for indexer proof requests in seconds" + ), + &["operation", "tree_type"] + ) + .unwrap_or_else(|e| { + error!("Failed to create metric INDEXER_RESPONSE_TIME: {:?}", e); + std::process::exit(1); + }); + pub static ref INDEXER_PROOF_COUNT: IntGaugeVec = IntGaugeVec::new( + prometheus::opts!( + "forester_indexer_proof_count", + "Number of proofs requested vs received from indexer" + ), + &["tree_type", "metric"] + ) + .unwrap_or_else(|e| { + error!("Failed to create metric INDEXER_PROOF_COUNT: {:?}", e); + std::process::exit(1); + }); static ref METRIC_UPDATES: Mutex> = Mutex::new(Vec::new()); } @@ -109,6 +131,12 @@ pub fn register_metrics() { if let Err(e) = REGISTRY.register(Box::new(REGISTERED_FORESTERS.clone())) { error!("Failed to register metric REGISTERED_FORESTERS: {:?}", e); } + if let Err(e) = REGISTRY.register(Box::new(INDEXER_RESPONSE_TIME.clone())) { + error!("Failed to register metric INDEXER_RESPONSE_TIME: {:?}", e); + } + if let Err(e) = REGISTRY.register(Box::new(INDEXER_PROOF_COUNT.clone())) { + error!("Failed to register metric INDEXER_PROOF_COUNT: {:?}", e); + } }); } @@ -180,6 +208,25 @@ pub fn update_registered_foresters(epoch: u64, authority: &str) { .set(1.0); } +pub fn update_indexer_response_time(operation: &str, tree_type: &str, duration_secs: f64) { + INDEXER_RESPONSE_TIME + .with_label_values(&[operation, tree_type]) + .set(duration_secs); + debug!( + "Indexer {} for {} took {:.3}s", + operation, tree_type, duration_secs + ); +} + +pub fn update_indexer_proof_count(tree_type: &str, requested: i64, received: i64) { + INDEXER_PROOF_COUNT + .with_label_values(&[tree_type, "requested"]) + .set(requested); + INDEXER_PROOF_COUNT + .with_label_values(&[tree_type, "received"]) + .set(received); +} + pub async fn push_metrics(url: &Option) -> Result<()> { let url = match url { Some(url) => url, diff --git a/forester/src/processor/v1/helpers.rs b/forester/src/processor/v1/helpers.rs index 1b4e4609f6..6598c6d394 100644 --- a/forester/src/processor/v1/helpers.rs +++ b/forester/src/processor/v1/helpers.rs @@ -8,10 +8,7 @@ use account_compression::{ }, }; use forester_utils::{rpc_pool::SolanaRpcPool, utils::wait_for_indexer}; -use light_client::{ - indexer::{Indexer, Items, MerkleProof, NewAddressProofWithContext}, - rpc::Rpc, -}; +use light_client::{indexer::Indexer, rpc::Rpc}; use light_compressed_account::TreeType; use light_registry::account_compression_cpi::sdk::{ create_nullify_instruction, create_update_address_merkle_tree_instruction, @@ -19,8 +16,12 @@ use light_registry::account_compression_cpi::sdk::{ }; use reqwest::Url; use solana_program::instruction::Instruction; -use tokio::join; -use tracing::warn; +use tokio::time::Instant; +use tracing::{info, warn}; + +use crate::metrics::{update_indexer_proof_count, update_indexer_response_time}; + +const ADDRESS_PROOF_BATCH_SIZE: usize = 100; use crate::{ epoch_manager::{MerkleProofType, WorkItem}, @@ -93,48 +94,133 @@ pub async fn fetch_proofs_and_create_instructions( warn!("Indexer not fully caught up, but proceeding anyway: {}", e); } - let (address_proofs_result, state_proofs_result) = { - let address_future = async { - if let Some((merkle_tree, addresses)) = address_data { - rpc.indexer()? - .get_multiple_new_address_proofs(merkle_tree, addresses, None) - .await - } else { - Ok(light_client::indexer::Response { - context: light_client::indexer::Context::default(), - value: Items::::default(), - }) - } - }; - - let state_future = async { - if let Some(states) = state_data { - rpc.indexer()? - .get_multiple_compressed_account_proofs(states, None) - .await - } else { - Ok(light_client::indexer::Response { - context: light_client::indexer::Context::default(), - value: Items::::default(), - }) - } - }; + let address_proofs = if let Some((merkle_tree, addresses)) = address_data { + let total_addresses = addresses.len(); + info!( + "Fetching {} address proofs in batches of {}", + total_addresses, ADDRESS_PROOF_BATCH_SIZE + ); - join!(address_future, state_future) - }; + let start_time = Instant::now(); + let mut all_proofs = Vec::with_capacity(total_addresses); + + for (batch_idx, batch) in addresses.chunks(ADDRESS_PROOF_BATCH_SIZE).enumerate() { + let batch_start = Instant::now(); + let batch_addresses: Vec<[u8; 32]> = batch.to_vec(); + let batch_size = batch_addresses.len(); + + match rpc + .indexer()? + .get_multiple_new_address_proofs(merkle_tree, batch_addresses, None) + .await + { + Ok(response) => { + let batch_duration = batch_start.elapsed(); + let proofs_received = response.value.items.len(); + + info!( + "Address proof batch {}: requested={}, received={}, duration={:.3}s", + batch_idx, + batch_size, + proofs_received, + batch_duration.as_secs_f64() + ); + + if proofs_received != batch_size { + warn!( + "Address proof count mismatch in batch {}: requested={}, received={}", + batch_idx, batch_size, proofs_received + ); + } - let address_proofs = match address_proofs_result { - Ok(response) => response.value.items, - Err(e) => { - return Err(anyhow::anyhow!("Failed to get address proofs: {}", e)); + all_proofs.extend(response.value.items); + } + Err(e) => { + let batch_duration = batch_start.elapsed(); + warn!( + "Failed to get address proofs for batch {} after {:.3}s: {}", + batch_idx, + batch_duration.as_secs_f64(), + e + ); + return Err(anyhow::anyhow!( + "Failed to get address proofs for batch {}: {}", + batch_idx, + e + )); + } + } } + + let total_duration = start_time.elapsed(); + info!( + "Address proofs complete: requested={}, received={}, total_duration={:.3}s", + total_addresses, + all_proofs.len(), + total_duration.as_secs_f64() + ); + + update_indexer_response_time( + "get_multiple_new_address_proofs", + "AddressV1", + total_duration.as_secs_f64(), + ); + update_indexer_proof_count("AddressV1", total_addresses as i64, all_proofs.len() as i64); + + all_proofs + } else { + Vec::new() }; - let state_proofs = match state_proofs_result { - Ok(response) => response.value.items, - Err(e) => { - return Err(anyhow::anyhow!("Failed to get state proofs: {}", e)); + let state_proofs = if let Some(states) = state_data { + let total_states = states.len(); + info!("Fetching {} state proofs", total_states); + + let start_time = Instant::now(); + match rpc + .indexer()? + .get_multiple_compressed_account_proofs(states, None) + .await + { + Ok(response) => { + let duration = start_time.elapsed(); + let proofs_received = response.value.items.len(); + + info!( + "State proofs complete: requested={}, received={}, duration={:.3}s", + total_states, + proofs_received, + duration.as_secs_f64() + ); + + if proofs_received != total_states { + warn!( + "State proof count mismatch: requested={}, received={}", + total_states, proofs_received + ); + } + + update_indexer_response_time( + "get_multiple_compressed_account_proofs", + "StateV1", + duration.as_secs_f64(), + ); + update_indexer_proof_count("StateV1", total_states as i64, proofs_received as i64); + + response.value.items + } + Err(e) => { + let duration = start_time.elapsed(); + warn!( + "Failed to get state proofs after {:.3}s: {}", + duration.as_secs_f64(), + e + ); + return Err(anyhow::anyhow!("Failed to get state proofs: {}", e)); + } } + } else { + Vec::new() }; for (item, proof) in address_items.iter().zip(address_proofs.into_iter()) { diff --git a/forester/src/processor/v1/send_transaction.rs b/forester/src/processor/v1/send_transaction.rs index 22b7bcd913..7fc27261f6 100644 --- a/forester/src/processor/v1/send_transaction.rs +++ b/forester/src/processor/v1/send_transaction.rs @@ -21,7 +21,9 @@ use solana_sdk::{ transaction::Transaction, }; use tokio::time::Instant; -use tracing::{error, trace, warn}; +use tracing::{error, info, trace, warn}; + +const WORK_ITEM_BATCH_SIZE: usize = 100; use crate::{ epoch_manager::WorkItem, @@ -90,12 +92,16 @@ pub async fn send_batched_transactions TxSender { break; } - let result = match proof_rx.recv().await { - Some(r) => r, - None => break, + let current_slot = self.context.slot_tracker.estimated_current_slot(); + if !self.is_still_eligible_at(current_slot) { + let proofs_saved = self.save_proofs_to_cache(&mut proof_rx, None).await; + info!( + "Active phase ended for epoch {}, stopping tx sender before recv (saved {} proofs to cache)", + self.context.epoch, proofs_saved + ); + drop(batch_tx); + let (items_processed, tx_sending_duration) = sender_handle + .await + .map_err(|e| anyhow::anyhow!("Sender panic: {}", e))??; + return Ok(TxSenderResult { + items_processed, + proof_timings: self.proof_timings, + proofs_saved_to_cache: proofs_saved, + tx_sending_duration, + }); + } + + let result = tokio::select! { + biased; + _ = tokio::time::sleep(Duration::from_secs(1)) => { + continue; + } + result = proof_rx.recv() => { + match result { + Some(r) => r, + None => break, + } + } }; let current_slot = self.context.slot_tracker.estimated_current_slot(); From 9ca0bb1733a92a84fb1dbcd5fe0b634b355df6de Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Fri, 9 Jan 2026 13:55:48 +0000 Subject: [PATCH 06/15] refactor: replace blocking status retrieval with async --- forester/src/api_server.rs | 19 ++++--------------- forester/src/forester_status.rs | 8 +------- 2 files changed, 5 insertions(+), 22 deletions(-) diff --git a/forester/src/api_server.rs b/forester/src/api_server.rs index 637f611330..ee0dba8339 100644 --- a/forester/src/api_server.rs +++ b/forester/src/api_server.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use tracing::{error, info}; use warp::{http::Response, Filter}; -use crate::{forester_status::get_forester_status_blocking, metrics::REGISTRY}; +use crate::{forester_status::get_forester_status, metrics::REGISTRY}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct HealthResponse { @@ -49,26 +49,15 @@ pub fn spawn_api_server(rpc_url: String, port: u16) { let status_route = warp::path("status").and(warp::get()).and_then(move || { let rpc_url = rpc_url.clone(); async move { - match tokio::task::spawn_blocking(move || { - get_forester_status_blocking(&rpc_url) - }) - .await - { - Ok(Ok(status)) => Ok::<_, warp::Rejection>(warp::reply::json(&status)), - Ok(Err(e)) => { + match get_forester_status(&rpc_url).await { + Ok(status) => Ok::<_, warp::Rejection>(warp::reply::json(&status)), + Err(e) => { error!("Failed to get forester status: {:?}", e); let error_response = ErrorResponse { error: format!("Failed to get forester status: {}", e), }; Ok(warp::reply::json(&error_response)) } - Err(e) => { - error!("Task join error: {:?}", e); - let error_response = ErrorResponse { - error: format!("Task join error: {}", e), - }; - Ok(warp::reply::json(&error_response)) - } } } }); diff --git a/forester/src/forester_status.rs b/forester/src/forester_status.rs index 756274b6e9..3af7e0d8e9 100644 --- a/forester/src/forester_status.rs +++ b/forester/src/forester_status.rs @@ -85,13 +85,7 @@ pub struct TreeStatus { pub owner: String, } -pub fn get_forester_status_blocking(rpc_url: &str) -> crate::Result { - tokio::runtime::Runtime::new() - .context("Failed to create tokio runtime")? - .block_on(get_forester_status_async(rpc_url)) -} - -async fn get_forester_status_async(rpc_url: &str) -> crate::Result { +pub async fn get_forester_status(rpc_url: &str) -> crate::Result { let rpc = LightClient::new(LightClientConfig { url: rpc_url.to_string(), photon_url: None, From 30c552487c93f7e30b7e46d3359494529be111d4 Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Fri, 9 Jan 2026 14:46:46 +0000 Subject: [PATCH 07/15] feat: enhance error handling in API server and metrics retrieval; improve dashboard script for better slot timing --- forester/src/api_server.rs | 37 ++++++--- forester/src/forester_status.rs | 64 +++++++++++----- forester/src/queue_helpers.rs | 129 +++++++++++++++++--------------- forester/src/tree_data_sync.rs | 11 +-- forester/static/dashboard.html | 20 +++-- 5 files changed, 160 insertions(+), 101 deletions(-) diff --git a/forester/src/api_server.rs b/forester/src/api_server.rs index ee0dba8339..45a5ab8ced 100644 --- a/forester/src/api_server.rs +++ b/forester/src/api_server.rs @@ -29,7 +29,13 @@ const DASHBOARD_HTML: &str = include_str!("../static/dashboard.html"); pub fn spawn_api_server(rpc_url: String, port: u16) { std::thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().unwrap(); + let rt = match tokio::runtime::Runtime::new() { + Ok(rt) => rt, + Err(e) => { + error!("Failed to create tokio runtime for API server: {}", e); + return; + } + }; rt.block_on(async move { let addr = SocketAddr::from(([0, 0, 0, 0], port)); info!("Starting HTTP API server on {}", addr); @@ -62,28 +68,41 @@ pub fn spawn_api_server(rpc_url: String, port: u16) { } }); - let metrics_route = warp::path!("metrics" / "json").and(warp::get()).map(|| { - let metrics = get_metrics_json(); - warp::reply::json(&metrics) - }); + let metrics_route = + warp::path!("metrics" / "json") + .and(warp::get()) + .and_then(|| async move { + match get_metrics_json() { + Ok(metrics) => Ok::<_, warp::Rejection>(warp::reply::json(&metrics)), + Err(e) => { + error!("Failed to encode metrics: {}", e); + let error_response = ErrorResponse { + error: format!("Failed to encode metrics: {}", e), + }; + Ok(warp::reply::json(&error_response)) + } + } + }); let routes = dashboard_route .or(health_route) .or(status_route) .or(metrics_route); + // warp::serve().run() will panic if binding fails. + // The panic message will be logged by the thread. warp::serve(routes).run(addr).await; }); }); } -fn get_metrics_json() -> MetricsResponse { +fn get_metrics_json() -> Result { use prometheus::Encoder; let encoder = prometheus::TextEncoder::new(); let metric_families = REGISTRY.gather(); let mut buffer = Vec::new(); - let _ = encoder.encode(&metric_families, &mut buffer); + encoder.encode(&metric_families, &mut buffer)?; let text = String::from_utf8_lossy(&buffer); let mut transactions_processed: HashMap = HashMap::new(); @@ -122,13 +141,13 @@ fn get_metrics_json() -> MetricsResponse { } } - MetricsResponse { + Ok(MetricsResponse { transactions_processed_total: transactions_processed, transaction_rate, last_run_timestamp, forester_balances, queue_lengths, - } + }) } fn extract_label(metric_part: &str, label_name: &str) -> Option { diff --git a/forester/src/forester_status.rs b/forester/src/forester_status.rs index 3af7e0d8e9..95c330c66b 100644 --- a/forester/src/forester_status.rs +++ b/forester/src/forester_status.rs @@ -258,7 +258,16 @@ pub async fn get_forester_status(rpc_url: &str) -> crate::Result // Build full schedule for each tree for status in &mut tree_statuses { - let queue_pubkey: Pubkey = status.queue.parse().unwrap_or_default(); + let queue_pubkey: Pubkey = match status.queue.parse() { + Ok(pk) => pk, + Err(e) => { + warn!( + "Failed to parse queue pubkey '{}': {}, skipping schedule computation", + status.queue, e + ); + continue; + } + }; let mut schedule: Vec> = Vec::with_capacity(total_light_slots as usize); @@ -329,31 +338,46 @@ async fn fetch_registry_accounts_filtered( let mut epoch_pdas = Vec::new(); let mut protocol_config_pdas = Vec::new(); - if let Ok(accounts) = forester_result { - for (_, account) in accounts { - let mut data: &[u8] = &account.data; - if let Ok(pda) = ForesterEpochPda::try_deserialize_unchecked(&mut data) { - forester_epoch_pdas.push(pda); + match forester_result { + Ok(accounts) => { + for (_, account) in accounts { + let mut data: &[u8] = &account.data; + if let Ok(pda) = ForesterEpochPda::try_deserialize_unchecked(&mut data) { + forester_epoch_pdas.push(pda); + } } } + Err(e) => { + warn!("Failed to fetch forester epoch accounts: {:?}", e); + } } - if let Ok(accounts) = epoch_result { - for (_, account) in accounts { - let mut data: &[u8] = &account.data; - if let Ok(pda) = EpochPda::try_deserialize_unchecked(&mut data) { - epoch_pdas.push(pda); + match epoch_result { + Ok(accounts) => { + for (_, account) in accounts { + let mut data: &[u8] = &account.data; + if let Ok(pda) = EpochPda::try_deserialize_unchecked(&mut data) { + epoch_pdas.push(pda); + } } } + Err(e) => { + warn!("Failed to fetch epoch accounts: {:?}", e); + } } - if let Ok(accounts) = config_result { - for (_, account) in accounts { - let mut data: &[u8] = &account.data; - if let Ok(pda) = ProtocolConfigPda::try_deserialize_unchecked(&mut data) { - protocol_config_pdas.push(pda); + match config_result { + Ok(accounts) => { + for (_, account) in accounts { + let mut data: &[u8] = &account.data; + if let Ok(pda) = ProtocolConfigPda::try_deserialize_unchecked(&mut data) { + protocol_config_pdas.push(pda); + } } } + Err(e) => { + warn!("Failed to fetch protocol config accounts: {:?}", e); + } } forester_epoch_pdas.sort_by(|a, b| a.epoch.cmp(&b.epoch)); @@ -577,7 +601,13 @@ fn parse_tree_status( Some(v2_info), ) } - TreeType::Unknown => (0.0, 0, 0, None, None), + TreeType::Unknown => { + warn!( + "Encountered unknown tree type for merkle_tree={}, queue={}", + tree.merkle_tree, tree.queue + ); + (0.0, 0, 0, None, None) + } }; Ok(TreeStatus { diff --git a/forester/src/queue_helpers.rs b/forester/src/queue_helpers.rs index 599269bc59..0b08c6f5cf 100644 --- a/forester/src/queue_helpers.rs +++ b/forester/src/queue_helpers.rs @@ -36,6 +36,52 @@ pub struct BatchInfo { pub items_in_current_zkp_batch: u64, } +#[derive(Debug, Clone)] +pub struct ParsedBatchData { + pub batch_infos: Vec, + pub total_pending_batches: u64, + pub zkp_batch_size: u64, + pub items_in_current_zkp_batch: u64, +} + +pub fn parse_batch_metadata(batches: &[light_batched_merkle_tree::batch::Batch]) -> ParsedBatchData { + use light_batched_merkle_tree::constants::DEFAULT_ZKP_BATCH_SIZE; + + let mut zkp_batch_size = DEFAULT_ZKP_BATCH_SIZE; + let mut total_pending_batches = 0u64; + let mut batch_infos = Vec::with_capacity(batches.len()); + let mut items_in_current_zkp_batch = 0u64; + + for (batch_idx, batch) in batches.iter().enumerate() { + zkp_batch_size = batch.zkp_batch_size; + let num_inserted = batch.get_num_inserted_zkps(); + let current_index = batch.get_current_zkp_batch_index(); + let pending_in_batch = current_index.saturating_sub(num_inserted); + + if batch.get_state() == BatchState::Fill { + items_in_current_zkp_batch = batch.get_num_inserted_zkp_batch(); + } + + batch_infos.push(BatchInfo { + batch_index: batch_idx, + state: format!("{:?}", batch.get_state()), + num_inserted, + current_index, + pending: pending_in_batch, + items_in_current_zkp_batch: batch.get_num_inserted_zkp_batch(), + }); + + total_pending_batches += pending_in_batch; + } + + ParsedBatchData { + batch_infos, + total_pending_batches, + zkp_batch_size, + items_in_current_zkp_batch, + } +} + pub fn parse_state_v2_queue_info( merkle_tree: &BatchedMerkleTreeAccount, output_queue_data: &mut [u8], @@ -194,7 +240,7 @@ pub async fn fetch_queue_item_data( .map(|(index, hash, _)| QueueItemData { hash, index }) .collect(); - tracing::info!( + tracing::debug!( "Queue {}: total_items={}, total_pending={}, range={}..{}, filtered_result={}", queue_pubkey, total_items, @@ -215,37 +261,32 @@ pub async fn print_state_v2_output_queue_info( let output_queue = BatchedQueueAccount::output_from_bytes(account.data.as_mut_slice())?; let metadata = output_queue.get_metadata(); let next_index = metadata.batch_metadata.next_index; + let zkp_batch_size = metadata.batch_metadata.zkp_batch_size; - let mut zkp_batch_size = DEFAULT_ZKP_BATCH_SIZE; - let mut total_unprocessed = 0; - let mut batch_details = Vec::new(); - let mut total_completed_operations = 0; + let parsed = parse_batch_metadata(&metadata.batch_metadata.batches); - for (batch_idx, batch) in metadata.batch_metadata.batches.iter().enumerate() { - zkp_batch_size = batch.zkp_batch_size; - let num_inserted = batch.get_num_inserted_zkps(); - let current_index = batch.get_current_zkp_batch_index(); - let pending_in_batch = current_index.saturating_sub(num_inserted); + // Calculate completed and pending operations (in items, not batches) + let mut total_completed_operations = 0u64; + let mut total_unprocessed = 0u64; + let mut batch_details = Vec::new(); - let completed_operations_in_batch = - num_inserted * metadata.batch_metadata.zkp_batch_size; + for batch_info in &parsed.batch_infos { + let completed_operations_in_batch = batch_info.num_inserted * zkp_batch_size; total_completed_operations += completed_operations_in_batch; - let pending_operations_in_batch = - pending_in_batch * metadata.batch_metadata.zkp_batch_size; + let pending_operations_in_batch = batch_info.pending * zkp_batch_size; + total_unprocessed += pending_operations_in_batch; batch_details.push(format!( - "batch_{}: state={:?}, zkp_inserted={}, zkp_current={}, zkp_pending={}, items_completed={}, items_pending={}", - batch_idx, - batch.get_state(), - num_inserted, - current_index, - pending_in_batch, + "batch_{}: state={}, zkp_inserted={}, zkp_current={}, zkp_pending={}, items_completed={}, items_pending={}", + batch_info.batch_index, + batch_info.state, + batch_info.num_inserted, + batch_info.current_index, + batch_info.pending, completed_operations_in_batch, pending_operations_in_batch )); - - total_unprocessed += pending_operations_in_batch; } println!("StateV2 {} APPEND:", output_queue_pubkey); @@ -259,10 +300,7 @@ pub async fn print_state_v2_output_queue_info( " pending_batch_index: {}", metadata.batch_metadata.pending_batch_index ); - println!( - " zkp_batch_size: {}", - metadata.batch_metadata.zkp_batch_size - ); + println!(" zkp_batch_size: {}", zkp_batch_size); println!( " SUMMARY: {} items added, {} items processed, {} items pending", next_index, total_completed_operations, total_unprocessed @@ -272,7 +310,7 @@ pub async fn print_state_v2_output_queue_info( } println!( " Total pending APPEND operations: {}", - total_unprocessed / zkp_batch_size + parsed.total_pending_batches ); Ok(total_unprocessed as usize) @@ -435,44 +473,17 @@ pub async fn get_state_v2_output_queue_info( let queue = BatchedQueueAccount::output_from_bytes(account.data.as_mut_slice())?; let next_index = queue.batch_metadata.next_index; - let mut zkp_batch_size = DEFAULT_ZKP_BATCH_SIZE; - let mut total_unprocessed = 0; - let mut batch_infos = Vec::new(); - let mut output_items_in_current_zkp_batch = 0u64; - - for (batch_idx, batch) in queue.batch_metadata.batches.iter().enumerate() { - zkp_batch_size = batch.zkp_batch_size; - let num_inserted = batch.get_num_inserted_zkps(); - let current_index = batch.get_current_zkp_batch_index(); - let pending_in_batch = current_index.saturating_sub(num_inserted); - - if batch.get_state() == BatchState::Fill { - output_items_in_current_zkp_batch = batch.get_num_inserted_zkp_batch(); - } - - batch_infos.push(BatchInfo { - batch_index: batch_idx, - state: format!("{:?}", batch.get_state()), - num_inserted, - current_index, - pending: pending_in_batch, - items_in_current_zkp_batch: batch.get_num_inserted_zkp_batch(), - }); - - total_unprocessed += pending_in_batch; - } - - let pending_batches = total_unprocessed; + let parsed = parse_batch_metadata(&queue.batch_metadata.batches); Ok(V2QueueInfo { next_index, pending_batch_index: queue.batch_metadata.pending_batch_index, - zkp_batch_size, - batches: batch_infos, + zkp_batch_size: parsed.zkp_batch_size, + batches: parsed.batch_infos, input_pending_batches: 0, - output_pending_batches: pending_batches, + output_pending_batches: parsed.total_pending_batches, input_items_in_current_zkp_batch: 0, - output_items_in_current_zkp_batch, + output_items_in_current_zkp_batch: parsed.items_in_current_zkp_batch, }) } else { Err(anyhow::anyhow!("account not found")) diff --git a/forester/src/tree_data_sync.rs b/forester/src/tree_data_sync.rs index b852e01e59..14f02cb5d7 100644 --- a/forester/src/tree_data_sync.rs +++ b/forester/src/tree_data_sync.rs @@ -316,16 +316,9 @@ fn create_tree_accounts( tree_accounts } -pub fn get_registered_program_pda(program_id: &Pubkey) -> Pubkey { - Pubkey::find_program_address( - &[program_id.to_bytes().as_slice()], - &account_compression::ID, - ) - .0 -} - pub async fn fetch_protocol_group_authority(rpc: &R) -> Result { - let registered_program_pda = get_registered_program_pda(&light_registry::ID); + let registered_program_pda = + light_registry::account_compression_cpi::sdk::get_registered_program_pda(&light_registry::ID); let account = rpc .get_account(registered_program_pda) diff --git a/forester/static/dashboard.html b/forester/static/dashboard.html index 0a1ac7ad98..afffc8e4aa 100644 --- a/forester/static/dashboard.html +++ b/forester/static/dashboard.html @@ -679,6 +679,8 @@

Forester Dashboard