diff --git a/Cargo.lock b/Cargo.lock index b0028265d..5c7d0cc7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -899,6 +899,25 @@ dependencies = [ "worker", ] +[[package]] +name = "ampq" +version = "0.1.0" +dependencies = [ + "amp-client", + "anyhow", + "chrono", + "clap", + "common", + "crossterm 0.29.0", + "futures", + "hex", + "ratatui", + "serde", + "serde_json", + "tokio", + "tracing", +] + [[package]] name = "ampsync" version = "0.1.0" @@ -2196,6 +2215,21 @@ dependencies = [ "thiserror 2.0.17", ] +[[package]] +name = "cassowary" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df8670b8c7b9dae1793364eafadf7239c40d669904660c5960d74cfd80b46a53" + +[[package]] +name = "castaway" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dec551ab6e7578819132c713a93c022a05d60159dc86e7a7050223577484c55a" +dependencies = [ + "rustversion", +] + [[package]] name = "cc" version = "1.2.45" @@ -2389,7 +2423,7 @@ dependencies = [ "crossterm 0.28.1", "strum 0.26.3", "strum_macros 0.26.4", - "unicode-width", + "unicode-width 0.2.0", ] [[package]] @@ -2431,6 +2465,20 @@ dependencies = [ "uuid", ] +[[package]] +name = "compact_str" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b79c4069c6cad78e2e0cdfcbd26275770669fb39fd308a752dc110e83b9af32" +dependencies = [ + "castaway", + "cfg-if", + "itoa", + "rustversion", + "ryu", + "static_assertions", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -2449,7 +2497,7 @@ dependencies = [ "encode_unicode", "libc", "once_cell", - "unicode-width", + "unicode-width 0.2.0", "windows-sys 0.59.0", ] @@ -2462,7 +2510,7 @@ dependencies = [ "encode_unicode", "libc", "once_cell", - "unicode-width", + "unicode-width 0.2.0", "windows-sys 0.61.2", ] @@ -2590,6 +2638,15 @@ dependencies = [ "worker", ] +[[package]] +name = "convert_case" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb402b8d4c85569410425650ce3eddc7d698ed96d39a73f941b08fb63082f1e7" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -2693,8 +2750,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "829d955a0bb380ef178a640b91779e3987da38c9aea133b20614cfed8cdea9c6" dependencies = [ "bitflags", + "crossterm_winapi", + "mio", "parking_lot", "rustix 0.38.44", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8b9f2e4c67f833b660cdb0a3523065869fb35570177239812ed4c905aeff87b" +dependencies = [ + "bitflags", + "crossterm_winapi", + "derive_more", + "document-features", + "mio", + "parking_lot", + "rustix 1.1.2", + "signal-hook", + "signal-hook-mio", + "winapi", ] [[package]] @@ -3589,7 +3669,7 @@ dependencies = [ "pin-project", "tracing", "tracing-futures", - "unicode-width", + "unicode-width 0.2.0", ] [[package]] @@ -3801,6 +3881,7 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" dependencies = [ + "convert_case", "proc-macro2", "quote", "syn 2.0.110", @@ -3875,6 +3956,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aac81fa3e28d21450aa4d2ac065992ba96a1d7303efbce51a95f4fd175b67562" +[[package]] +name = "document-features" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4b8a88685455ed29a21542a33abd9cb6510b6b129abadabdcef0f4c55bc8f61" +dependencies = [ + "litrs", +] + [[package]] name = "dotenvy" version = "0.15.7" @@ -5264,7 +5354,7 @@ dependencies = [ "console 0.15.11", "number_prefix", "portable-atomic", - "unicode-width", + "unicode-width 0.2.0", "web-time", ] @@ -5292,6 +5382,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "instability" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435d80800b936787d62688c927b6490e887c7ef5ff9ce922c6c6050fca75eb9a" +dependencies = [ + "darling 0.20.11", + "indoc", + "proc-macro2", + "quote", + "syn 2.0.108", +] + [[package]] name = "integer-encoding" version = "3.0.4" @@ -5599,6 +5702,12 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" +[[package]] +name = "litrs" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11d3d7f243d5c5a8b9bb5d6dd2b1602c0cb0b9db1621bafc7ed66e35ff9fe092" + [[package]] name = "lmdb-master-sys" version = "0.2.5" @@ -5625,6 +5734,15 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.5", +] + [[package]] name = "lru" version = "0.13.0" @@ -5857,6 +5975,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.61.2", ] @@ -7314,6 +7433,27 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "ratatui" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eabd94c2f37801c20583fc49dd5cd6b0ba68c716787c2dd6ed18571e1e63117b" +dependencies = [ + "bitflags", + "cassowary", + "compact_str", + "crossterm 0.28.1", + "indoc", + "instability", + "itertools 0.13.0", + "lru 0.12.5", + "paste", + "strum 0.26.3", + "unicode-segmentation", + "unicode-truncate", + "unicode-width 0.2.0", +] + [[package]] name = "raw-cpuid" version = "11.6.0" @@ -8158,6 +8298,27 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d881a16cf4426aa584979d30bd82cb33429027e42122b169753d6ef1085ed6e2" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-mio" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b75a19a7a740b25bc7944bdee6172368f988763b744e3d4dfe753f6b4ece40cc" +dependencies = [ + "libc", + "mio", + "signal-hook", +] + [[package]] name = "signal-hook-registry" version = "1.4.6" @@ -8570,6 +8731,9 @@ name = "strum" version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" +dependencies = [ + "strum_macros 0.26.4", +] [[package]] name = "strum" @@ -9535,11 +9699,28 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" +[[package]] +name = "unicode-truncate" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3644627a5af5fa321c95b9b235a72fd24cd29c648c2c379431e6628655627bf" +dependencies = [ + "itertools 0.13.0", + "unicode-segmentation", + "unicode-width 0.1.14", +] + [[package]] name = "unicode-width" -version = "0.2.2" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" + +[[package]] +name = "unicode-width" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" +checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" [[package]] name = "unicode-xid" diff --git a/Cargo.toml b/Cargo.toml index f300efc28..535f873dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "crates/auth/http", "crates/bin/ampctl", "crates/bin/ampd", + "crates/bin/ampq", "crates/bin/ampsync", "crates/bin/ampup", "crates/client", diff --git a/crates/bin/ampq/Cargo.toml b/crates/bin/ampq/Cargo.toml new file mode 100644 index 000000000..c10490d1e --- /dev/null +++ b/crates/bin/ampq/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "ampq" +edition.workspace = true +version.workspace = true +license-file.workspace = true + +[dependencies] +amp-client = { version = "0.1.0", path = "../../client", features = ["lmdb"] } +anyhow.workspace = true +chrono.workspace = true +clap = { workspace = true, features = ["derive"] } +common = { version = "0.1.0", path = "../../core/common" } +crossterm = "0.29.0" +futures.workspace = true +hex.workspace = true +ratatui = "0.29.0" +serde.workspace = true +serde_json.workspace = true +tokio.workspace = true +tracing.workspace = true diff --git a/crates/bin/ampq/src/app.rs b/crates/bin/ampq/src/app.rs new file mode 100644 index 000000000..50c37b529 --- /dev/null +++ b/crates/bin/ampq/src/app.rs @@ -0,0 +1,423 @@ +//! Application state management + +use std::path::PathBuf; + +use anyhow::Result; +use crossterm::event::{Event, KeyCode, KeyEvent}; + +use crate::stream::StreamHandler; + +/// CLI arguments (stored for reconnection) +#[derive(Debug, Clone)] +pub struct AppArgs { + pub endpoint: String, + pub query: Option, + pub buffer_size: usize, + pub output: Option, + pub lmdb: Option, +} + +impl From for AppArgs { + fn from(args: crate::Args) -> Self { + Self { + endpoint: args.endpoint, + query: args.query, + buffer_size: args.buffer_size, + output: args.output, + lmdb: args.lmdb, + } + } +} + +/// Application state +pub struct App { + /// CLI arguments (for reconnection) + pub args: AppArgs, + + /// Current connection status + pub status: ConnectionStatus, + + /// Data rows for display + pub rows: Vec, + + /// Current vertical scroll offset (row index) + pub scroll_offset: usize, + + /// Current horizontal scroll offset (column index) + pub horizontal_scroll_offset: usize, + + /// Whether the stream is paused + pub paused: bool, + + /// Whether to quit the application + pub should_quit: bool, + + /// Stream handler for receiving data + pub stream_handler: StreamHandler, + + /// Whether currently connected + pub is_connected: bool, + + /// Whether to show the help panel + pub show_help: bool, +} + +/// Connection status +#[derive(Debug, Clone, PartialEq)] +#[allow(dead_code)] +pub enum ConnectionStatus { + Disconnected, + Connecting, + Connected, + Error(String), +} + +/// A single row of data +#[derive(Debug, Clone, PartialEq)] +pub struct Row { + /// Row data as key-value pairs + pub data: Vec<(String, String)>, + + /// Row type (Insert, Delete, or Normal) + pub row_type: RowType, + + /// Transaction ID this row belongs to (for CDC tracking) + pub transaction_id: Option, +} + +/// Type of row +#[derive(Debug, Clone, PartialEq)] +#[allow(dead_code)] +pub enum RowType { + Normal, + Insert, + Delete, +} + +impl App { + /// Create a new application instance + pub fn new(args: impl Into) -> Self { + let args = args.into(); + + // Create stream handler - start streaming if query is provided + let (stream_handler, is_connected) = if args.query.is_some() { + ( + StreamHandler::new_with_stream( + args.endpoint.clone(), + args.query.clone().unwrap(), + args.output.clone(), + args.lmdb.clone(), + ), + true, + ) + } else { + (StreamHandler::new(), false) + }; + + // Start with empty rows if streaming, otherwise show mock data + let rows = if args.query.is_some() { + Vec::new() + } else { + Self::generate_mock_data() + }; + + Self { + args, + status: ConnectionStatus::Disconnected, + rows, + scroll_offset: 0, + horizontal_scroll_offset: 0, + paused: false, + should_quit: false, + stream_handler, + is_connected, + show_help: false, + } + } + + /// Manually disconnect from the stream + pub fn disconnect(&mut self) { + tracing::info!("User requested disconnect"); + self.stream_handler = StreamHandler::new(); + self.is_connected = false; + self.status = ConnectionStatus::Disconnected; + } + + /// Manually reconnect to the stream + pub fn reconnect(&mut self) { + if let Some(ref query) = self.args.query { + tracing::info!("User requested reconnect"); + self.stream_handler = StreamHandler::new_with_stream( + self.args.endpoint.clone(), + query.clone(), + self.args.output.clone(), + self.args.lmdb.clone(), + ); + self.is_connected = true; + self.status = ConnectionStatus::Connecting; + } else { + tracing::warn!("Cannot reconnect: no query specified"); + } + } + + /// Generate mock data for initial display + fn generate_mock_data() -> Vec { + // Generate 100 rows of mock data for testing scrolling + // Reversed so newest (highest block number) is at index 0 + (0..100) + .rev() + .map(|i| { + let block_num = 12345678 + i; + Row { + data: vec![ + ("block_number".to_string(), block_num.to_string()), + ( + "hash".to_string(), + format!("0x{:016x}...", block_num * 123456), + ), + ( + "timestamp".to_string(), + format!("2024-01-15 10:{}:{:02}", 30 + i / 60, i % 60), + ), + ("gas_used".to_string(), format!("{}", 21000 + i * 1000)), + ("miner".to_string(), format!("0x{:040x}", i * 999999)), + ], + row_type: match i % 10 { + 0 => RowType::Insert, + 9 => RowType::Delete, + _ => RowType::Normal, + }, + transaction_id: None, + } + }) + .collect() + } + + /// Handle terminal events + pub async fn handle_event(&mut self, event: Event) -> Result { + if let Event::Key(key) = event { + return Ok(self.handle_key_event(key)); + } + Ok(true) + } + + /// Handle keyboard input + fn handle_key_event(&mut self, key: KeyEvent) -> bool { + match key.code { + KeyCode::Char('q') | KeyCode::Esc => { + self.should_quit = true; + false + } + KeyCode::Char(' ') => { + self.paused = !self.paused; + true + } + KeyCode::Char('d') => { + // Disconnect + self.disconnect(); + true + } + KeyCode::Char('r') => { + // Reconnect + self.reconnect(); + true + } + KeyCode::Char('?') | KeyCode::F(1) => { + // Toggle help + self.show_help = !self.show_help; + true + } + // Vertical scrolling + KeyCode::Down | KeyCode::Char('j') => { + self.scroll_down(); + true + } + KeyCode::Up | KeyCode::Char('k') => { + self.scroll_up(); + true + } + KeyCode::PageDown => { + self.scroll_down_page(); + true + } + KeyCode::PageUp => { + self.scroll_up_page(); + true + } + // Horizontal scrolling + KeyCode::Left | KeyCode::Char('h') => { + self.scroll_left(); + true + } + KeyCode::Right | KeyCode::Char('l') => { + self.scroll_right(); + true + } + // Jump to edges + KeyCode::Home => { + self.scroll_offset = 0; + self.horizontal_scroll_offset = 0; + true + } + KeyCode::End => { + self.scroll_to_end(); + true + } + _ => true, + } + } + + /// Scroll down by one row + fn scroll_down(&mut self) { + if self.scroll_offset < self.rows.len().saturating_sub(1) { + self.scroll_offset += 1; + } + } + + /// Scroll up by one row + fn scroll_up(&mut self) { + self.scroll_offset = self.scroll_offset.saturating_sub(1); + } + + /// Scroll down by a page + fn scroll_down_page(&mut self) { + self.scroll_offset = (self.scroll_offset + 10).min(self.rows.len().saturating_sub(1)); + } + + /// Scroll up by a page + fn scroll_up_page(&mut self) { + self.scroll_offset = self.scroll_offset.saturating_sub(10); + } + + /// Scroll to the end + fn scroll_to_end(&mut self) { + self.scroll_offset = self.rows.len().saturating_sub(1); + } + + /// Scroll left by one column + fn scroll_left(&mut self) { + self.horizontal_scroll_offset = self.horizontal_scroll_offset.saturating_sub(1); + } + + /// Scroll right by one column + fn scroll_right(&mut self) { + let num_columns = self.get_columns().len(); + if num_columns > 0 && self.horizontal_scroll_offset < num_columns - 1 { + self.horizontal_scroll_offset += 1; + } + } + + /// Update application state (called every frame) + pub async fn update(&mut self) -> Result<()> { + // Check for new stream messages + while let Some(message) = self.stream_handler.try_recv() { + use crate::stream::{StreamMessage, StreamStatus}; + + match message { + StreamMessage::Data(mut new_rows) => { + if !self.paused { + // Insert new rows at the beginning (newest first) + new_rows.reverse(); // Reverse so newest is first + self.rows.splice(0..0, new_rows); + + // Trim buffer to max size if needed (remove oldest from end) + if self.rows.len() > self.args.buffer_size { + let new_len = self.args.buffer_size; + self.rows.truncate(new_len); + // No need to adjust scroll offset since we're trimming from the end + } + } + } + StreamMessage::Reorg { + transaction_ids, + message: _, + } => { + if !self.paused { + // Mark all rows with matching transaction IDs as deleted (red) + for row in &mut self.rows { + if let Some(ref row_tx_id) = row.transaction_id + && transaction_ids.contains(row_tx_id) + { + row.row_type = RowType::Delete; + } + } + } + } + StreamMessage::StatusChanged(stream_status) => { + self.status = match stream_status { + StreamStatus::Connecting => ConnectionStatus::Connecting, + StreamStatus::Connected => ConnectionStatus::Connected, + StreamStatus::Disconnected => ConnectionStatus::Disconnected, + StreamStatus::Error(err) => ConnectionStatus::Error(err), + }; + } + StreamMessage::Error(err) => { + self.status = ConnectionStatus::Error(err); + } + } + } + + Ok(()) + } + + /// Get the column names from the first row + pub fn get_columns(&self) -> Vec { + self.rows + .first() + .map(|row| row.data.iter().map(|(k, _)| k.clone()).collect()) + .unwrap_or_default() + } + + /// Get visible column names based on horizontal scroll and available width + pub fn get_visible_columns(&self, available_width: usize) -> Vec<(usize, String, usize)> { + let all_columns = self.get_columns(); + if all_columns.is_empty() { + return Vec::new(); + } + + let mut visible = Vec::new(); + let mut used_width = 0; + let padding = 3; // Space for padding and borders + + // Start from horizontal scroll offset + for (idx, col_name) in all_columns + .iter() + .enumerate() + .skip(self.horizontal_scroll_offset) + { + // Calculate column width based on content + let col_width = self.calculate_column_width(idx, col_name); + + if used_width + col_width + padding > available_width && !visible.is_empty() { + break; + } + + visible.push((idx, col_name.clone(), col_width)); + used_width += col_width + padding; + } + + visible + } + + /// Calculate the width needed for a column + fn calculate_column_width(&self, col_idx: usize, col_name: &str) -> usize { + let mut max_width = col_name.len(); + + // Check the first 100 rows to determine width (for performance) + for row in self.rows.iter().take(100) { + if let Some((_, value)) = row.data.get(col_idx) { + max_width = max_width.max(value.len()); + } + } + + // Cap at reasonable max width + max_width.min(50) + } + + /// Get visible rows based on scroll offset + pub fn get_visible_rows(&self, max_rows: usize) -> &[Row] { + let start = self.scroll_offset; + let end = (start + max_rows).min(self.rows.len()); + &self.rows[start..end] + } +} diff --git a/crates/bin/ampq/src/main.rs b/crates/bin/ampq/src/main.rs new file mode 100644 index 000000000..beae923e2 --- /dev/null +++ b/crates/bin/ampq/src/main.rs @@ -0,0 +1,77 @@ +//! Amp TUI streaming client +//! +//! A terminal user interface for visualizing streaming data from Amp servers. +//! Displays real-time data updates, inserts, deletes, and reorg events. + +use anyhow::Result; +use clap::Parser; + +mod app; +mod stream; +mod ui; + +use app::App; + +/// Amp TUI streaming query client +#[derive(Parser, Debug, Clone)] +#[command(name = "ampq")] +#[command(about = "TUI client for streaming queries from Amp", long_about = None)] +struct Args { + /// Arrow Flight endpoint URL + #[arg(short, long, default_value = "http://localhost:1602")] + endpoint: String, + + /// SQL query to execute + #[arg(short, long)] + query: Option, + + /// Maximum number of rows to keep in buffer + #[arg(short, long, default_value = "1000")] + buffer_size: usize, + + /// Output JSONL file path (if specified, writes CDC events to file) + #[arg(short, long)] + output: Option, + + /// LMDB database path for persistent state and batch storage + #[arg(short, long)] + lmdb: Option, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + // Set up terminal + let mut terminal = ui::setup_terminal()?; + + // Create app state + let mut app = App::new(args.clone()); + + // Run the TUI application + let result = run_app(&mut terminal, &mut app).await; + + // Restore terminal + ui::restore_terminal(&mut terminal)?; + + result +} + +async fn run_app(terminal: &mut ui::Terminal, app: &mut App) -> Result<()> { + loop { + // Draw the UI + terminal.draw(|frame| ui::draw(frame, app))?; + + // Handle events + if let Some(event) = ui::poll_event()? + && !app.handle_event(event).await? + { + break; + } + + // Update app state (e.g., check for new streaming data) + app.update().await?; + } + + Ok(()) +} diff --git a/crates/bin/ampq/src/stream.rs b/crates/bin/ampq/src/stream.rs new file mode 100644 index 000000000..95cc57998 --- /dev/null +++ b/crates/bin/ampq/src/stream.rs @@ -0,0 +1,782 @@ +//! Streaming data handling +//! +//! This module handles the connection to the Amp server and processes +//! streaming data events. + +use std::{ + fs::File, + io::{BufWriter, Write}, + path::PathBuf, +}; + +use amp_client::AmpClient; +use anyhow::Result; +use common::arrow::array::RecordBatch; +use futures::StreamExt; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use tokio::sync::mpsc; + +use crate::app::{Row, RowType}; + +/// Represents a CDC row written to JSONL +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "operation")] +enum CdcRowJson { + #[serde(rename = "insert")] + Insert { + transaction_id: String, + timestamp: String, + data: Value, + }, + #[serde(rename = "delete")] + Delete { + transaction_id: String, + timestamp: String, + deleted_transaction_id: String, + data: Value, + }, +} + +/// Stream message sent from the streaming task to the UI +#[derive(Debug, Clone)] +pub enum StreamMessage { + /// New data batch received + Data(Vec), + + /// Connection status changed + StatusChanged(StreamStatus), + + /// Reorg notification + Reorg { + transaction_ids: Vec, + #[allow(dead_code)] + message: String, + }, + + /// Error occurred + Error(String), +} + +/// Stream connection status +#[derive(Debug, Clone, PartialEq)] +#[allow(dead_code)] +pub enum StreamStatus { + Connecting, + Connected, + Disconnected, + Error(String), +} + +/// Stream handler that manages the connection and data flow +pub struct StreamHandler { + /// Receiver for stream messages + receiver: mpsc::UnboundedReceiver, +} + +impl StreamHandler { + /// Create a new stream handler without starting a stream + pub fn new() -> Self { + let (_sender, receiver) = mpsc::unbounded_channel(); + Self { receiver } + } + + /// Create a new stream handler and start streaming from the endpoint + pub fn new_with_stream( + endpoint: String, + query: String, + output_file: Option, + lmdb_path: Option, + ) -> Self { + let (sender, receiver) = mpsc::unbounded_channel(); + + // Spawn streaming task + tokio::spawn(async move { + if let Err(e) = + start_stream_task(endpoint, query, sender.clone(), output_file, lmdb_path).await + { + tracing::error!("❌ Stream task failed: {:?}", e); + let error_msg = format!("Stream error: {}", e); + if let Err(send_err) = sender.send(StreamMessage::Error(error_msg)) { + tracing::error!("Failed to send error message to UI: {}", send_err); + } + } + }); + + Self { receiver } + } + + /// Try to receive a message from the stream (non-blocking) + pub fn try_recv(&mut self) -> Option { + self.receiver.try_recv().ok() + } +} + +impl Default for StreamHandler { + fn default() -> Self { + Self::new() + } +} + +/// Start the streaming task with CDC +async fn start_stream_task( + endpoint: String, + query: String, + sender: mpsc::UnboundedSender, + output_file: Option, + lmdb_path: Option, +) -> Result<()> { + use amp_client::CdcEvent; + + tracing::info!("🔌 Starting stream task for endpoint: {}", endpoint); + tracing::info!("📝 Query: {}", query); + + // Open output file if specified (always append mode for reconnection support) + let mut jsonl_writer = if let Some(path) = output_file { + tracing::info!("📄 Writing CDC events to: {}", path.display()); + let file = File::options().create(true).append(true).open(&path)?; + tracing::info!("📄 Using append mode (reconnection-safe)"); + Some(BufWriter::new(file)) + } else { + tracing::info!("📄 No output file specified"); + None + }; + + // Send connecting status + if let Err(e) = sender.send(StreamMessage::StatusChanged(StreamStatus::Connecting)) { + tracing::error!("Failed to send connecting status: {}", e); + return Err(e.into()); + } + + // Connect to the server + tracing::info!("🔗 Connecting to server at: {}", endpoint); + let client = AmpClient::from_endpoint(&endpoint).await?; + tracing::info!("✅ Connected to server successfully"); + + // Send connected status + if let Err(e) = sender.send(StreamMessage::StatusChanged(StreamStatus::Connected)) { + tracing::error!("Failed to send connected status: {}", e); + return Err(e.into()); + } + + // Create CDC stream with state stores (LMDB if path provided, otherwise in-memory) + tracing::info!("🚀 Creating CDC stream"); + + let mut stream = if let Some(lmdb_path) = lmdb_path { + use amp_client::store::{LmdbBatchStore, LmdbStateStore, open_lmdb_env}; + + tracing::info!("💾 Using LMDB storage at: {}", lmdb_path.display()); + + // Create parent directory if it doesn't exist + if let Some(parent) = lmdb_path.parent() { + std::fs::create_dir_all(parent)?; + } + + // Open LMDB environment + let env = open_lmdb_env(&lmdb_path)?; + let state_store = LmdbStateStore::new(env.clone())?; + let batch_store = LmdbBatchStore::new(env)?; + + tracing::info!("📡 Executing streaming query with persistent storage..."); + client + .stream(&query) + .cdc(state_store, batch_store, u64::MAX) + .await? + } else { + use amp_client::{InMemoryBatchStore, InMemoryStateStore}; + + tracing::info!("💾 Using in-memory storage (state will not persist)"); + let state_store = InMemoryStateStore::new(); + let batch_store = InMemoryBatchStore::new(); + + tracing::info!("📡 Executing streaming query..."); + client + .stream(&query) + .cdc(state_store, batch_store, u64::MAX) + .await? + }; + + tracing::info!("✅ CDC stream created successfully, waiting for events..."); + + // Process CDC events + let mut event_count = 0; + let mut total_inserts = 0; + let mut total_deletes = 0; + + loop { + let result = stream.next().await; + + // Check if stream ended + let Some(result) = result else { + tracing::warn!("⚠️ Stream ended (returned None) - server may have closed connection"); + sender + .send(StreamMessage::Error("Stream closed by server".to_string())) + .ok(); + break; + }; + + match result { + Ok((event, commit)) => { + event_count += 1; + + match event { + CdcEvent::Insert { id, batch } => { + // Convert RecordBatch to rows with Insert type + let tx_id = format!("{:?}", id); + let rows = match convert_batch_to_rows( + &batch, + RowType::Insert, + Some(tx_id.clone()), + ) { + Ok(r) => r, + Err(e) => { + tracing::error!("Failed to convert batch to rows: {}", e); + // Continue without crashing - just log and skip this batch + continue; + } + }; + total_inserts += rows.len(); + + tracing::info!( + "➕ Insert event #{}: {} rows (id: {:?}) | Total inserts: {}", + event_count, + rows.len(), + id, + total_inserts + ); + + // Write to JSONL file if configured (one line per row) + if let Some(ref mut writer) = jsonl_writer { + let timestamp = chrono::Utc::now().to_rfc3339(); + match batch_to_json(&batch) { + Ok(json_rows) => { + for row_data in json_rows { + let row_json = CdcRowJson::Insert { + transaction_id: tx_id.clone(), + timestamp: timestamp.clone(), + data: row_data, + }; + if let Err(e) = write_json_line(writer, &row_json) { + tracing::error!("Failed to write JSONL row: {}", e); + // Continue processing - don't crash the stream + } + } + } + Err(e) => { + tracing::error!("Failed to convert batch to JSON: {}", e); + // Continue processing - don't crash the stream + } + } + } + + // Send to UI - if channel is closed, stream should end + if sender.send(StreamMessage::Data(rows)).is_err() { + tracing::warn!("UI channel closed, ending stream"); + break; + } + + // Commit the event + if let Err(e) = commit.await { + tracing::error!("Failed to commit insert event: {}", e); + // This is a critical error - break the stream + sender + .send(StreamMessage::Error(format!("Commit failed: {}", e))) + .ok(); + break; + } + } + CdcEvent::Delete { mut batches, id } => { + tracing::warn!( + "➖ Delete event #{}: Reorg/rewind (id: {:?})", + event_count, + id + ); + + // Collect all deleted transaction IDs and batches + let mut deleted_tx_ids = Vec::new(); + let mut deleted_batches = Vec::new(); + let mut row_count = 0; + while let Some(result) = batches.next().await { + match result { + Ok((batch_id, batch)) => { + let tx_id = format!("{:?}", batch_id); + let num_rows = batch.num_rows(); + deleted_tx_ids.push(tx_id.clone()); + deleted_batches.push((tx_id, batch)); + row_count += num_rows; + total_deletes += num_rows; + } + Err(e) => { + tracing::error!("Error loading delete batch: {}", e); + // Continue loading other batches + } + } + } + + tracing::info!( + " Deleted {} rows from {} batches | Total deletes: {}", + row_count, + deleted_tx_ids.len(), + total_deletes + ); + + // Write to JSONL file if configured (one line per deleted row) + if let Some(ref mut writer) = jsonl_writer { + let timestamp = chrono::Utc::now().to_rfc3339(); + for (deleted_tx_id, batch) in &deleted_batches { + match batch_to_json(batch) { + Ok(json_rows) => { + for row_data in json_rows { + let row_json = CdcRowJson::Delete { + transaction_id: format!("{:?}", id), + timestamp: timestamp.clone(), + deleted_transaction_id: deleted_tx_id.clone(), + data: row_data, + }; + if let Err(e) = write_json_line(writer, &row_json) { + tracing::error!( + "Failed to write JSONL delete row: {}", + e + ); + // Continue processing - don't crash the stream + } + } + } + Err(e) => { + tracing::error!( + "Failed to convert delete batch to JSON: {}", + e + ); + // Continue processing - don't crash the stream + } + } + } + } + + if !deleted_tx_ids.is_empty() { + // Send to UI - if channel is closed, stream should end + if sender + .send(StreamMessage::Reorg { + transaction_ids: deleted_tx_ids, + message: format!("Reorg: {} rows deleted", row_count), + }) + .is_err() + { + tracing::warn!("UI channel closed, ending stream"); + break; + } + } + + // Commit the event + if let Err(e) = commit.await { + tracing::error!("Failed to commit delete event: {}", e); + // This is a critical error - break the stream + sender + .send(StreamMessage::Error(format!("Commit failed: {}", e))) + .ok(); + break; + } + } + } + } + Err(e) => { + tracing::error!( + "❌ Stream error after {} events (+{} -{} rows): {}", + event_count, + total_inserts, + total_deletes, + e + ); + sender.send(StreamMessage::Error(format!("Stream error: {}", e)))?; + break; + } + } + } + + tracing::info!( + "🏁 Stream ended normally. Total: {} events, {} inserts, {} deletes", + event_count, + total_inserts, + total_deletes + ); + + if let Err(e) = sender.send(StreamMessage::StatusChanged(StreamStatus::Disconnected)) { + tracing::warn!("Failed to send disconnected status (channel closed): {}", e); + } + + Ok(()) +} + +/// Convert Arrow RecordBatch to UI Row format +fn convert_batch_to_rows( + batch: &RecordBatch, + row_type: RowType, + transaction_id: Option, +) -> Result> { + let mut rows = Vec::new(); + let schema = batch.schema(); + + for row_idx in 0..batch.num_rows() { + let mut data = Vec::new(); + + for (col_idx, field) in schema.fields().iter().enumerate() { + let column = batch.column(col_idx); + let value = format_arrow_value(column.as_ref(), row_idx)?; + data.push((field.name().clone(), value)); + } + + rows.push(Row { + data, + row_type: row_type.clone(), + transaction_id: transaction_id.clone(), + }); + } + + Ok(rows) +} + +/// Format an Arrow array value as a string +fn format_arrow_value(column: &dyn common::arrow::array::Array, row_idx: usize) -> Result { + use common::arrow::{ + array::{ + BinaryArray, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, + Int8Array, Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, + StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, + UInt64Array, + }, + datatypes::{DataType, TimeUnit}, + }; + + if column.is_null(row_idx) { + return Ok("NULL".to_string()); + } + + let value = match column.data_type() { + DataType::Int8 => { + let array = column.as_any().downcast_ref::().unwrap(); + array.value(row_idx).to_string() + } + DataType::Int16 => { + let array = column.as_any().downcast_ref::().unwrap(); + array.value(row_idx).to_string() + } + DataType::Int32 => { + let array = column.as_any().downcast_ref::().unwrap(); + array.value(row_idx).to_string() + } + DataType::Int64 => { + let array = column.as_any().downcast_ref::().unwrap(); + array.value(row_idx).to_string() + } + DataType::UInt8 => { + let array = column.as_any().downcast_ref::().unwrap(); + array.value(row_idx).to_string() + } + DataType::UInt16 => { + let array = column.as_any().downcast_ref::().unwrap(); + array.value(row_idx).to_string() + } + DataType::UInt32 => { + let array = column.as_any().downcast_ref::().unwrap(); + array.value(row_idx).to_string() + } + DataType::UInt64 => { + let array = column.as_any().downcast_ref::().unwrap(); + array.value(row_idx).to_string() + } + DataType::Float32 => { + let array = column.as_any().downcast_ref::().unwrap(); + array.value(row_idx).to_string() + } + DataType::Float64 => { + let array = column.as_any().downcast_ref::().unwrap(); + array.value(row_idx).to_string() + } + DataType::Boolean => { + let array = column.as_any().downcast_ref::().unwrap(); + array.value(row_idx).to_string() + } + DataType::Utf8 => { + let array = column.as_any().downcast_ref::().unwrap(); + array.value(row_idx).to_string() + } + DataType::LargeUtf8 => { + let array = column.as_any().downcast_ref::().unwrap(); + array.value(row_idx).to_string() + } + DataType::Binary => { + let array = column.as_any().downcast_ref::().unwrap(); + let bytes = array.value(row_idx); + format!("0x{}", hex::encode(bytes)) + } + DataType::LargeBinary => { + let array = column.as_any().downcast_ref::().unwrap(); + let bytes = array.value(row_idx); + format!("0x{}", hex::encode(bytes)) + } + DataType::Date32 => { + let array = column.as_any().downcast_ref::().unwrap(); + array.value(row_idx).to_string() + } + DataType::Date64 => { + let array = column.as_any().downcast_ref::().unwrap(); + array.value(row_idx).to_string() + } + DataType::Timestamp(unit, _) => match unit { + TimeUnit::Second => { + let array = column + .as_any() + .downcast_ref::() + .unwrap(); + format_timestamp(array.value(row_idx), 1_000_000_000) + } + TimeUnit::Millisecond => { + let array = column + .as_any() + .downcast_ref::() + .unwrap(); + format_timestamp(array.value(row_idx), 1_000_000) + } + TimeUnit::Microsecond => { + let array = column + .as_any() + .downcast_ref::() + .unwrap(); + format_timestamp(array.value(row_idx), 1_000) + } + TimeUnit::Nanosecond => { + let array = column + .as_any() + .downcast_ref::() + .unwrap(); + format_timestamp(array.value(row_idx), 1) + } + }, + // For complex types, try to extract values intelligently + DataType::List(_) | DataType::LargeList(_) | DataType::FixedSizeList(_, _) => { + // For lists, show array representation + let sliced = column.slice(row_idx, 1); + format_array_value(&sliced) + } + DataType::Struct(_) => { + // For structs, show field values + use common::arrow::array::StructArray; + if let Some(struct_array) = column.as_any().downcast_ref::() { + let values: Vec = (0..struct_array.num_columns()) + .map(|col_idx| { + let col = struct_array.column(col_idx); + format_arrow_value(col.as_ref(), row_idx) + .unwrap_or_else(|_| "?".to_string()) + }) + .collect(); + format!("{{{}}}", values.join(", ")) + } else { + format!("{:?}", column.slice(row_idx, 1)) + } + } + DataType::Decimal128(_, scale) | DataType::Decimal256(_, scale) => { + // For decimals, show numeric value with scale + let sliced = column.slice(row_idx, 1); + format!("{} (scale: {})", format_array_value(&sliced), scale) + } + // Fallback: try to extract a string representation + _ => { + // Try array cast to string + let sliced = column.slice(row_idx, 1); + format_array_value(&sliced) + } + }; + + Ok(value) +} + +/// Format an array value using Arrow's display formatting +fn format_array_value(array: &dyn common::arrow::array::Array) -> String { + use common::arrow::util::display::{ArrayFormatter, FormatOptions}; + + // Create formatter with default options + let options = FormatOptions::default(); + match ArrayFormatter::try_new(array, &options) { + Ok(formatter) => { + // Format the first (and only) value + formatter.value(0).to_string() + } + Err(_) => { + // Fallback to debug representation + format!("{:?}", array) + } + } +} + +/// Format a timestamp value as a human-readable string +fn format_timestamp(value: i64, nanos_divisor: i64) -> String { + use chrono::DateTime; + + let nanos = value * nanos_divisor; + if let Some(dt) = + DateTime::from_timestamp(nanos / 1_000_000_000, (nanos % 1_000_000_000) as u32) + { + dt.format("%Y-%m-%d %H:%M:%S").to_string() + } else { + value.to_string() + } +} + +/// Convert a RecordBatch to JSON representation +fn batch_to_json(batch: &RecordBatch) -> Result> { + let mut rows = Vec::new(); + let schema = batch.schema(); + + for row_idx in 0..batch.num_rows() { + let mut row_obj = serde_json::Map::new(); + + for (col_idx, field) in schema.fields().iter().enumerate() { + let column = batch.column(col_idx); + let value = arrow_value_to_json(column.as_ref(), row_idx)?; + row_obj.insert(field.name().clone(), value); + } + + rows.push(Value::Object(row_obj)); + } + + Ok(rows) +} + +/// Convert an Arrow value to JSON +fn arrow_value_to_json(column: &dyn common::arrow::array::Array, row_idx: usize) -> Result { + use common::arrow::{ + array::{ + BinaryArray, BooleanArray, Float32Array, Float64Array, Int8Array, Int16Array, + Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, StringArray, UInt8Array, + UInt16Array, UInt32Array, UInt64Array, + }, + datatypes::DataType, + }; + + if column.is_null(row_idx) { + return Ok(Value::Null); + } + + let value = match column.data_type() { + DataType::Boolean => { + let array = column.as_any().downcast_ref::().unwrap(); + Value::Bool(array.value(row_idx)) + } + DataType::Int8 => { + let array = column.as_any().downcast_ref::().unwrap(); + Value::Number(array.value(row_idx).into()) + } + DataType::Int16 => { + let array = column.as_any().downcast_ref::().unwrap(); + Value::Number(array.value(row_idx).into()) + } + DataType::Int32 => { + let array = column.as_any().downcast_ref::().unwrap(); + Value::Number(array.value(row_idx).into()) + } + DataType::Int64 => { + let array = column.as_any().downcast_ref::().unwrap(); + // JSON doesn't handle i64 well, so convert to string for large values + let val = array.value(row_idx); + if val.abs() < (1i64 << 53) { + Value::Number(val.into()) + } else { + Value::String(val.to_string()) + } + } + DataType::UInt8 => { + let array = column.as_any().downcast_ref::().unwrap(); + Value::Number(array.value(row_idx).into()) + } + DataType::UInt16 => { + let array = column.as_any().downcast_ref::().unwrap(); + Value::Number(array.value(row_idx).into()) + } + DataType::UInt32 => { + let array = column.as_any().downcast_ref::().unwrap(); + Value::Number(array.value(row_idx).into()) + } + DataType::UInt64 => { + let array = column.as_any().downcast_ref::().unwrap(); + // JSON doesn't handle u64 well, so convert to string for large values + let val = array.value(row_idx); + if val < (1u64 << 53) { + Value::Number(val.into()) + } else { + Value::String(val.to_string()) + } + } + DataType::Float32 => { + let array = column.as_any().downcast_ref::().unwrap(); + let val = array.value(row_idx); + if val.is_finite() { + serde_json::Number::from_f64(val as f64) + .map(Value::Number) + .unwrap_or_else(|| Value::String(val.to_string())) + } else { + Value::String(val.to_string()) + } + } + DataType::Float64 => { + let array = column.as_any().downcast_ref::().unwrap(); + let val = array.value(row_idx); + if val.is_finite() { + serde_json::Number::from_f64(val) + .map(Value::Number) + .unwrap_or_else(|| Value::String(val.to_string())) + } else { + Value::String(val.to_string()) + } + } + DataType::Utf8 => { + let array = column.as_any().downcast_ref::().unwrap(); + Value::String(array.value(row_idx).to_string()) + } + DataType::LargeUtf8 => { + let array = column.as_any().downcast_ref::().unwrap(); + Value::String(array.value(row_idx).to_string()) + } + DataType::Binary => { + let array = column.as_any().downcast_ref::().unwrap(); + let bytes = array.value(row_idx); + Value::String(format!("0x{}", hex::encode(bytes))) + } + DataType::LargeBinary => { + let array = column.as_any().downcast_ref::().unwrap(); + let bytes = array.value(row_idx); + Value::String(format!("0x{}", hex::encode(bytes))) + } + // For complex types, convert to string representation + _ => { + use common::arrow::util::display::{ArrayFormatter, FormatOptions}; + let options = FormatOptions::default(); + let sliced = column.slice(row_idx, 1); + match ArrayFormatter::try_new(&sliced, &options) { + Ok(formatter) => Value::String(formatter.value(0).to_string()), + Err(_) => Value::String(format!("{:?}", sliced)), + } + } + }; + + Ok(value) +} + +/// Write a JSON row to the file +fn write_json_line(writer: &mut W, row: &CdcRowJson) -> Result<()> { + serde_json::to_writer(&mut *writer, row)?; + writeln!(writer)?; // JSONL format requires newline after each JSON object + writer.flush()?; // Flush to ensure data is written immediately + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_stream_handler_creation() { + let mut handler = StreamHandler::new(); + assert!(handler.try_recv().is_none()); + } +} diff --git a/crates/bin/ampq/src/ui.rs b/crates/bin/ampq/src/ui.rs new file mode 100644 index 000000000..9c0b30ae5 --- /dev/null +++ b/crates/bin/ampq/src/ui.rs @@ -0,0 +1,431 @@ +//! Terminal UI rendering and event handling + +use std::{ + io::{self, Stdout}, + time::Duration, +}; + +use anyhow::Result; +use crossterm::{ + event::{self, Event}, + execute, + terminal::{EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode}, +}; +use ratatui::{ + Frame, + backend::CrosstermBackend, + layout::{Constraint, Direction, Layout, Rect}, + style::{Color, Modifier, Style}, + text::{Line, Span}, + widgets::{Block, Borders, Cell, Paragraph, Row as TableRow, Table}, +}; + +use crate::app::{App, ConnectionStatus, RowType}; + +pub type Terminal = ratatui::Terminal>; + +/// Set up the terminal for TUI +pub fn setup_terminal() -> Result { + enable_raw_mode()?; + let mut stdout = io::stdout(); + execute!(stdout, EnterAlternateScreen)?; + let backend = CrosstermBackend::new(stdout); + let terminal = Terminal::new(backend)?; + Ok(terminal) +} + +/// Restore the terminal to normal mode +pub fn restore_terminal(terminal: &mut Terminal) -> Result<()> { + disable_raw_mode()?; + execute!(terminal.backend_mut(), LeaveAlternateScreen)?; + terminal.show_cursor()?; + Ok(()) +} + +/// Poll for terminal events with timeout +pub fn poll_event() -> Result> { + if event::poll(Duration::from_millis(100))? { + Ok(Some(event::read()?)) + } else { + Ok(None) + } +} + +/// Draw the entire UI +pub fn draw(frame: &mut Frame, app: &App) { + let chunks = Layout::default() + .direction(Direction::Vertical) + .constraints([ + Constraint::Length(5), // Header (expanded for storage info) + Constraint::Min(0), // Data table + Constraint::Length(3), // Status bar + ]) + .split(frame.area()); + + draw_header(frame, app, chunks[0]); + draw_data_table(frame, app, chunks[1]); + draw_status_bar(frame, app, chunks[2]); + + // Draw help panel on top if enabled + if app.show_help { + draw_help_panel(frame, app); + } +} + +/// Draw the header with title and connection info +fn draw_header(frame: &mut Frame, app: &App, area: Rect) { + let title = vec![ + Span::styled( + "Amp", + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + ), + Span::raw(" "), + Span::styled("TUI Streaming Client", Style::default().fg(Color::White)), + ]; + + let connection_info = format!(" Endpoint: {} ", app.args.endpoint); + + let storage_info = if app.args.lmdb.is_some() { + format!( + " Storage: LMDB ({})", + app.args.lmdb.as_ref().unwrap().display() + ) + } else { + " Storage: In-Memory".to_string() + }; + + let header_text = vec![ + Line::from(title), + Line::from(connection_info), + Line::from(storage_info), + ]; + + let header = Paragraph::new(header_text) + .block(Block::default().borders(Borders::ALL).title("Query Stream")); + + frame.render_widget(header, area); +} + +/// Draw the main data table +fn draw_data_table(frame: &mut Frame, app: &App, area: Rect) { + let all_columns = app.get_columns(); + + if all_columns.is_empty() { + let placeholder = Paragraph::new("No data available. Press 'q' to quit.") + .block(Block::default().borders(Borders::ALL).title("Data")); + frame.render_widget(placeholder, area); + return; + } + + // Calculate available dimensions + let available_height = area.height.saturating_sub(3) as usize; + let available_width = area.width.saturating_sub(2) as usize; // Account for borders + let visible_rows = app.get_visible_rows(available_height); + + // Get visible columns based on horizontal scroll + let visible_columns = app.get_visible_columns(available_width); + + if visible_columns.is_empty() { + let placeholder = Paragraph::new("No columns to display.") + .block(Block::default().borders(Borders::ALL).title("Data")); + frame.render_widget(placeholder, area); + return; + } + + // Create header row with visible columns + let header_cells: Vec = visible_columns + .iter() + .map(|(_, col_name, _)| { + Cell::from(col_name.as_str()).style(Style::default().fg(Color::Yellow)) + }) + .collect(); + let header = TableRow::new(header_cells) + .style(Style::default().add_modifier(Modifier::BOLD)) + .height(1); + + // Create data rows with only visible columns + let rows: Vec = visible_rows + .iter() + .map(|row| { + let cells: Vec = visible_columns + .iter() + .map(|(col_idx, _, _)| { + let value = row + .data + .get(*col_idx) + .map(|(_, v)| v.as_str()) + .unwrap_or(""); + Cell::from(value) + }) + .collect(); + + let style = match row.row_type { + RowType::Insert => Style::default().fg(Color::Green), + RowType::Delete => Style::default().fg(Color::Red), + RowType::Normal => Style::default(), + }; + + TableRow::new(cells).style(style) + }) + .collect(); + + // Calculate column widths based on actual content + let widths: Vec = visible_columns + .iter() + .map(|(_, _, width)| Constraint::Length(*width as u16)) + .collect(); + + // Build title with scroll indicators + let total_cols = all_columns.len(); + let showing_cols = visible_columns.len(); + let first_col_idx = visible_columns + .first() + .map(|(idx, _, _)| idx + 1) + .unwrap_or(0); + let last_col_idx = visible_columns + .last() + .map(|(idx, _, _)| idx + 1) + .unwrap_or(0); + + let title = if total_cols > showing_cols { + format!( + "Data (Rows {}-{}/{}, Cols {}-{}/{})", + app.scroll_offset + 1, + (app.scroll_offset + visible_rows.len()).min(app.rows.len()), + app.rows.len(), + first_col_idx, + last_col_idx, + total_cols + ) + } else { + format!( + "Data (Rows {}-{}/{})", + app.scroll_offset + 1, + (app.scroll_offset + visible_rows.len()).min(app.rows.len()), + app.rows.len() + ) + }; + + let table = Table::new(rows, widths) + .header(header) + .block(Block::default().borders(Borders::ALL).title(title)) + .style(Style::default().fg(Color::White)); + + frame.render_widget(table, area); +} + +/// Draw the status bar with controls and status +fn draw_status_bar(frame: &mut Frame, app: &App, area: Rect) { + let status_text = match &app.status { + ConnectionStatus::Connected => Span::styled("Connected", Style::default().fg(Color::Green)), + ConnectionStatus::Connecting => { + Span::styled("Connecting...", Style::default().fg(Color::Yellow)) + } + ConnectionStatus::Disconnected => { + Span::styled("Disconnected", Style::default().fg(Color::Red)) + } + ConnectionStatus::Error(err) => { + Span::styled(format!("Error: {}", err), Style::default().fg(Color::Red)) + } + }; + + let paused_text = if app.paused { + Span::styled(" [PAUSED]", Style::default().fg(Color::Yellow)) + } else { + Span::raw("") + }; + + // Show buffer info + let buffer_info = format!( + " | Buffer: {}/{} rows", + app.rows.len(), + app.args.buffer_size + ); + let buffer_text = Span::styled(buffer_info, Style::default().fg(Color::Cyan)); + + let controls = if app.is_connected { + " [?]Help [q]Quit [d]Disconnect [Space]Pause [↑↓/jk]Scroll [PgUp/PgDn]Page [Home/End]Jump " + } else { + " [?]Help [q]Quit [r]Reconnect [Space]Pause [↑↓/jk]Scroll [PgUp/PgDn]Page [Home/End]Jump " + }; + + let status_line = vec![ + Line::from(vec![status_text, paused_text, buffer_text]), + Line::from(controls), + ]; + + let status = + Paragraph::new(status_line).block(Block::default().borders(Borders::ALL).title("Status")); + + frame.render_widget(status, area); +} + +/// Draw help panel overlay +fn draw_help_panel(frame: &mut Frame, _app: &App) { + use ratatui::layout::Alignment; + + // Create centered popup area (80% width, 70% height) + let area = frame.area(); + let popup_width = (area.width * 80) / 100; + let popup_height = (area.height * 70) / 100; + let popup_x = (area.width - popup_width) / 2; + let popup_y = (area.height - popup_height) / 2; + + let popup_area = Rect { + x: popup_x, + y: popup_y, + width: popup_width, + height: popup_height, + }; + + // Help content + let help_text = vec![ + Line::from(vec![Span::styled( + "Keyboard Shortcuts", + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + )]), + Line::from(""), + Line::from(vec![Span::styled( + "General", + Style::default() + .fg(Color::Yellow) + .add_modifier(Modifier::BOLD), + )]), + Line::from(vec![ + Span::styled(" ? or F1 ", Style::default().fg(Color::Green)), + Span::raw("Toggle this help panel"), + ]), + Line::from(vec![ + Span::styled(" q or Esc ", Style::default().fg(Color::Green)), + Span::raw("Quit the application"), + ]), + Line::from(vec![ + Span::styled(" Space ", Style::default().fg(Color::Green)), + Span::raw("Pause/Resume streaming (display freezes, data still received)"), + ]), + Line::from(""), + Line::from(vec![Span::styled( + "Connection Control", + Style::default() + .fg(Color::Yellow) + .add_modifier(Modifier::BOLD), + )]), + Line::from(vec![ + Span::styled(" d ", Style::default().fg(Color::Green)), + Span::raw("Disconnect from stream (stops receiving data)"), + ]), + Line::from(vec![ + Span::styled(" r ", Style::default().fg(Color::Green)), + Span::raw("Reconnect to stream (resumes from last state if using LMDB)"), + ]), + Line::from(""), + Line::from(vec![Span::styled( + "Vertical Scrolling", + Style::default() + .fg(Color::Yellow) + .add_modifier(Modifier::BOLD), + )]), + Line::from(vec![ + Span::styled(" ↑ or k ", Style::default().fg(Color::Green)), + Span::raw("Scroll up one row"), + ]), + Line::from(vec![ + Span::styled(" ↓ or j ", Style::default().fg(Color::Green)), + Span::raw("Scroll down one row"), + ]), + Line::from(vec![ + Span::styled(" PgUp ", Style::default().fg(Color::Green)), + Span::raw("Scroll up one page (10 rows)"), + ]), + Line::from(vec![ + Span::styled(" PgDn ", Style::default().fg(Color::Green)), + Span::raw("Scroll down one page (10 rows)"), + ]), + Line::from(""), + Line::from(vec![Span::styled( + "Horizontal Scrolling", + Style::default() + .fg(Color::Yellow) + .add_modifier(Modifier::BOLD), + )]), + Line::from(vec![ + Span::styled(" ← or h ", Style::default().fg(Color::Green)), + Span::raw("Scroll left one column"), + ]), + Line::from(vec![ + Span::styled(" → or l ", Style::default().fg(Color::Green)), + Span::raw("Scroll right one column"), + ]), + Line::from(""), + Line::from(vec![Span::styled( + "Navigation", + Style::default() + .fg(Color::Yellow) + .add_modifier(Modifier::BOLD), + )]), + Line::from(vec![ + Span::styled(" Home ", Style::default().fg(Color::Green)), + Span::raw("Jump to top-left (first row, first column)"), + ]), + Line::from(vec![ + Span::styled(" End ", Style::default().fg(Color::Green)), + Span::raw("Jump to bottom (last row)"), + ]), + Line::from(""), + Line::from(vec![Span::styled( + "Visual Indicators", + Style::default() + .fg(Color::Yellow) + .add_modifier(Modifier::BOLD), + )]), + Line::from(vec![ + Span::styled(" Green rows ", Style::default().fg(Color::Green)), + Span::raw("Insert events (new data)"), + ]), + Line::from(vec![ + Span::styled(" Red rows ", Style::default().fg(Color::Red)), + Span::raw("Delete events (reorgs/rewinds)"), + ]), + Line::from(vec![ + Span::styled(" White rows ", Style::default().fg(Color::White)), + Span::raw("Normal/existing data"), + ]), + Line::from(""), + Line::from(vec![ + Span::styled("Press ", Style::default().fg(Color::Gray)), + Span::styled( + "?", + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + ), + Span::styled(" or ", Style::default().fg(Color::Gray)), + Span::styled( + "F1", + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + ), + Span::styled(" to close this help", Style::default().fg(Color::Gray)), + ]), + ]; + + let help_paragraph = Paragraph::new(help_text) + .block( + Block::default() + .borders(Borders::ALL) + .border_style(Style::default().fg(Color::Cyan)) + .title(" Help - Keyboard Shortcuts ") + .title_alignment(Alignment::Center), + ) + .alignment(Alignment::Left) + .wrap(ratatui::widgets::Wrap { trim: false }); + + // Clear the background + frame.render_widget(ratatui::widgets::Clear, popup_area); + frame.render_widget(help_paragraph, popup_area); +}