-
Notifications
You must be signed in to change notification settings - Fork 14
Support s3 as remote segment #93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
244f1d0 to
322390a
Compare
2ff3ad2 to
b699799
Compare
|
@luoyuxia Hi, yuxia, PTAL if u have time. It has been running stably for over 10 hours, and the long-term stability test will continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements S3 support for remote log segments in Fluss, enabling log data to be stored and retrieved from S3-compatible storage. The implementation includes credential management with caching, proper timeout handling, and client-side projection for remote logs.
Key changes:
- Added S3 storage backend with configuration via OpenDAL's S3 service
- Implemented security token-based authentication with credential caching (1-hour TTL)
- Fixed Arrow IPC metadata padding calculation for correct remote log parsing
- Added client-side projection support for remote logs with full schema handling
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
crates/fluss/src/io/storage_s3.rs |
New module implementing S3 configuration builder and path parsing utilities |
crates/fluss/src/io/storage.rs |
Extended Storage enum to support S3 with credential properties |
crates/fluss/src/io/mod.rs |
Added conditional compilation for storage-s3 feature |
crates/fluss/src/client/credentials.rs |
New module implementing credentials cache with security token fetching and Hadoop-to-OpenDAL key conversion |
crates/fluss/src/client/table/scanner.rs |
Integrated credentials cache into LogFetcher for S3 authentication |
crates/fluss/src/client/table/remote_log.rs |
Added S3 props injection, timeout handling, and debug logging for remote downloads |
crates/fluss/src/client/mod.rs |
Exported credentials module |
crates/fluss/src/rpc/message/get_security_token.rs |
New RPC message for fetching filesystem security tokens |
crates/fluss/src/rpc/message/mod.rs |
Added get_security_token module and exports |
crates/fluss/src/rpc/api_key.rs |
Added GetFileSystemSecurityToken API key (1025) |
crates/fluss/src/record/arrow.rs |
Fixed metadata padding calculation and added record_batch_for_remote_log with full schema support |
crates/fluss/src/proto/fluss_api.proto |
Added protocol buffer definitions for security token request/response |
crates/fluss/Cargo.toml |
Added storage-s3 feature and enabled it by default |
crates/examples/Cargo.toml |
Added dependencies for serde, serde_json, and opendal |
Cargo.toml |
Added workspace dependencies for serde, serde_json, and opendal |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| eprintln!( | ||
| "Remote log download: reading chunk {}/{} (offset {})", | ||
| chunk_count, total_chunks, offset | ||
| ); | ||
| } |
Copilot
AI
Dec 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using eprintln! for logging download progress is not appropriate for production code. This outputs directly to stderr and cannot be controlled or filtered. Consider using the tracing crate (which is already a dependency of this project) for proper structured logging. This would allow users to control log levels and output destinations.
| let server_node = cluster | ||
| .get_coordinator_server() | ||
| .or_else(|| Some(cluster.get_one_available_server())) | ||
| .expect("no available server to fetch security token"); |
Copilot
AI
Dec 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using .expect() with a panic message bypasses proper error handling. If no server is available, this will cause a panic instead of returning a proper error. Consider using .ok_or_else() to return an appropriate Error type instead, allowing the caller to handle this failure gracefully.
| // Get file metadata to know the size | ||
| let meta = op.stat(relative_path).await?; | ||
| // Timeout for remote storage operations (30 seconds) | ||
| const REMOTE_OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); |
Copilot
AI
Dec 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded timeout constant is defined locally in the function instead of at the module level or as a configurable constant. Consider moving this to a module-level constant for better maintainability and consistency with the pattern used in storage_s3.rs (lines 32-33) where timeouts are defined at the module level.
| const REMOTE_OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); |
Cargo.toml
Outdated
|
|
||
| serde = { version = "1.0", features = ["derive"] } | ||
| serde_json = "1.0" | ||
| opendal = { version = "0.53", features = ["services-s3"] } |
Copilot
AI
Dec 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The version specified in Cargo.toml (0.53) doesn't match the version used in the workspace (0.53.3 as seen in crates/fluss/Cargo.toml line 57). This inconsistency could lead to dependency resolution issues. Consider using workspace = true to inherit the version from the workspace root instead of specifying it directly.
| opendal = { version = "0.53", features = ["services-s3"] } | |
| opendal = { version = "0.53.3", features = ["services-s3"] } |
| let credentials: Credentials = serde_json::from_slice(&response.token) | ||
| .map_err(|e| Error::JsonSerdeError(e.to_string()))?; | ||
|
|
||
| let mut addition_infos = HashMap::new(); | ||
| for kv in &response.addition_info { | ||
| addition_infos.insert(kv.key.clone(), kv.value.clone()); | ||
| } | ||
|
|
||
| let cached = CachedToken { | ||
| access_key_id: credentials.access_key_id, | ||
| secret_access_key: credentials.access_key_secret, | ||
| security_token: credentials.security_token, | ||
| addition_infos, | ||
| cached_at: Instant::now(), | ||
| }; | ||
|
|
||
| let props = cached.to_s3_props(); | ||
| *self.inner.write() = Some(cached); | ||
|
|
||
| Ok(props) | ||
| } |
Copilot
AI
Dec 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The credentials cache has a fixed TTL of 1 hour (3600 seconds), but the response from the server includes an optional expiration_time field (see fluss_api.proto line 313). Consider using the server-provided expiration time when available instead of using a fixed duration, as the server may issue credentials with different lifetimes based on security policies.
| pub fn record_batch_for_remote_log(&self, data: &[u8]) -> Result<Option<RecordBatch>> { | ||
| let (batch_metadata, body_buffer, version) = match parse_ipc_message(data) { | ||
| Some(result) => result, | ||
| None => return Ok(None), | ||
| }; | ||
|
|
||
| let record_batch = read_record_batch( | ||
| &body_buffer, | ||
| batch_metadata, | ||
| self.full_schema.clone(), | ||
| &std::collections::HashMap::new(), | ||
| None, | ||
| &version, | ||
| )?; | ||
|
|
||
| let record_batch = match &self.projection { | ||
| Some(projection) => { | ||
| let projected_columns: Vec<_> = projection | ||
| .projected_fields | ||
| .iter() | ||
| .map(|&idx| record_batch.column(idx).clone()) | ||
| .collect(); | ||
| RecordBatch::try_new(self.target_schema.clone(), projected_columns)? | ||
| } | ||
| None => record_batch, | ||
| }; | ||
| Ok(Some(record_batch)) |
Copilot
AI
Dec 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment describes client-side projection, but the implementation differs from the regular record_batch() method. The regular method applies ordered_schema when projection exists (line 811), while this method always uses full_schema (line 852). This inconsistency could lead to incorrect data parsing when projections are involved. Consider documenting why this difference is necessary or aligning the implementations if the difference is unintentional.
|
|
||
| message GetFileSystemSecurityTokenResponse { | ||
| required string schema = 1; |
Copilot
AI
Dec 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new GetFileSystemSecurityTokenResponse message returns a token (parsed client-side as JSON with access_key_id/access_key_secret) over the existing RPC layer, which currently uses a plain TCP Transport::Plain without TLS or any encryption. Anyone with access to the network path between client and server can sniff this RPC and steal cloud credentials, enabling full compromise of the backing object store and its data. Protect this RPC with transport-level encryption (for example by adding TLS to the RPC transport) and/or change the design so that only short-lived, scoped tokens are issued and transmitted instead of raw access keys.
| message GetFileSystemSecurityTokenResponse { | |
| required string schema = 1; | |
| // Response containing a short-lived, scoped security token for accessing the file system. | |
| // WARNING: Do NOT transmit raw access keys or long-lived credentials in the token field. | |
| // Only issue and transmit short-lived, scoped tokens (e.g., session tokens, JWTs, or | |
| // cloud provider temporary credentials) with minimal privileges and a short expiration. | |
| message GetFileSystemSecurityTokenResponse { | |
| required string schema = 1; | |
| // Short-lived, scoped security token (e.g., session token, JWT, or temporary credentials). | |
| // MUST NOT contain raw access keys or long-lived secrets. |
luoyuxia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhaohaidao Thanks for the pr. Left minor comments. PTAL
| /// Downloader for remote log segment files | ||
| pub struct RemoteLogDownloader { | ||
| local_log_dir: TempDir, | ||
| s3_props: RwLock<HashMap<String, String>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In here, we hard code to s3 props, but what if the remote storage is not s3?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My idea here is this: the current implementation follows the principle of simplicity, avoiding over-encapsulation. Seeing the 's3' prefix should clearly indicate that this is the configuration used by s3, and other remote storages should not use it.
Do you have any suggestions? If you want different RemoteDownloaders to be implemented for different remote storages, I can also modify it.
| if let Some(ref remote_log_fetch_info) = | ||
| fetch_log_for_bucket.remote_log_fetch_info | ||
| { | ||
| let s3_props = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems we always set s3 props, but what if it's not s3 as remote storage.
| Ok(op.layer(timeout_layer)) | ||
| } | ||
|
|
||
| pub(crate) fn parse_s3_path(path: &str) -> (&str, &str) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems it's not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in storage.rs. I
#[cfg(feature = "storage-s3")]
Storage::S3 { props } => {
let (bucket, key) = super::parse_s3_path(path);
let mut s3_props = props.clone();
s3_props.insert("bucket".to_string(), bucket.to_string());
let op = super::s3_config_build(&s3_props)?;
Ok((op, key))
}
Purpose
Linked issue: close (#92)
It has been running stably for over 10 hours, and the long-term stability test will continue.

Brief change log
Tests
API and Format
Documentation