Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions crates/fluss/tests/integration/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ mod table_test {
use crate::integration::utils::create_table;
use arrow::array::record_batch;
use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, TablePath};
use fluss::rpc::message::OffsetSpec;
use fluss::row::InternalRow;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
Expand Down Expand Up @@ -252,4 +253,111 @@ mod table_test {
);
}
}

#[tokio::test]
async fn list_offsets() {
let cluster = get_fluss_cluster();
let connection = cluster.get_fluss_connection().await;

let admin = connection.get_admin().await.expect("Failed to get admin");

let table_path = TablePath::new("fluss".to_string(), "test_list_offsets".to_string());

let table_descriptor = TableDescriptor::builder()
.schema(
Schema::builder()
.column("id", DataTypes::int())
.column("name", DataTypes::string())
.build()
.expect("Failed to build schema"),
)
.build()
.expect("Failed to build table");

create_table(&admin, &table_path, &table_descriptor).await;

// Wait for table to be fully initialized
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;

// Test earliest offset (should be 0 for empty table)
let earliest_offsets = admin
.list_offsets(&table_path, &[0], OffsetSpec::Earliest)
.await
.expect("Failed to list earliest offsets");

assert_eq!(
earliest_offsets.get(&0),
Some(&0),
"Earliest offset should be 0 for bucket 0"
);

// Test latest offset (should be 0 for empty table)
let latest_offsets = admin
.list_offsets(&table_path, &[0], OffsetSpec::Latest)
.await
.expect("Failed to list latest offsets");

assert_eq!(
latest_offsets.get(&0),
Some(&0),
"Latest offset should be 0 for empty table"
);

// Append some records
let append_writer = connection
.get_table(&table_path)
.await
.expect("Failed to get table")
.new_append()
.expect("Failed to create append")
.create_writer();

let batch = record_batch!(
("id", Int32, [1, 2, 3]),
("name", Utf8, ["alice", "bob", "charlie"])
)
.unwrap();
append_writer
.append_arrow_batch(batch)
.await
.expect("Failed to append batch");

// Wait for records to be committed
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;

// Test latest offset after appending (should be 3)
let latest_offsets_after = admin
.list_offsets(&table_path, &[0], OffsetSpec::Latest)
.await
.expect("Failed to list latest offsets after append");

assert_eq!(
latest_offsets_after.get(&0),
Some(&3),
"Latest offset should be 3 after appending 3 records"
);

// Test earliest offset after appending (should still be 0)
let earliest_offsets_after = admin
.list_offsets(&table_path, &[0], OffsetSpec::Earliest)
.await
.expect("Failed to list earliest offsets after append");

assert_eq!(
earliest_offsets_after.get(&0),
Some(&0),
"Earliest offset should still be 0"
);

// Test with multiple buckets
let multi_bucket_offsets = admin
.list_offsets(&table_path, &[0], OffsetSpec::Latest)
.await
.expect("Failed to list offsets for multiple buckets");

assert!(
multi_bucket_offsets.contains_key(&0),
"Should have offset for bucket 0"
);
}
}