Skip to content

Conversation

@zhaohaidao
Copy link
Contributor

@zhaohaidao zhaohaidao commented Dec 13, 2025

Purpose

Linked issue: close (#92)

It has been running stably for over 10 hours, and the long-term stability test will continue.
image

Brief change log

Tests

API and Format

Documentation

@zhaohaidao zhaohaidao changed the title Support s3 as remote segment (WIP)Support s3 as remote segment Dec 13, 2025
@zhaohaidao zhaohaidao changed the title (WIP)Support s3 as remote segment Support s3 as remote segment Dec 13, 2025
@zhaohaidao
Copy link
Contributor Author

@luoyuxia Hi, yuxia, PTAL if u have time. It has been running stably for over 10 hours, and the long-term stability test will continue

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements S3 support for remote log segments in Fluss, enabling log data to be stored and retrieved from S3-compatible storage. The implementation includes credential management with caching, proper timeout handling, and client-side projection for remote logs.

Key changes:

  • Added S3 storage backend with configuration via OpenDAL's S3 service
  • Implemented security token-based authentication with credential caching (1-hour TTL)
  • Fixed Arrow IPC metadata padding calculation for correct remote log parsing
  • Added client-side projection support for remote logs with full schema handling

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
crates/fluss/src/io/storage_s3.rs New module implementing S3 configuration builder and path parsing utilities
crates/fluss/src/io/storage.rs Extended Storage enum to support S3 with credential properties
crates/fluss/src/io/mod.rs Added conditional compilation for storage-s3 feature
crates/fluss/src/client/credentials.rs New module implementing credentials cache with security token fetching and Hadoop-to-OpenDAL key conversion
crates/fluss/src/client/table/scanner.rs Integrated credentials cache into LogFetcher for S3 authentication
crates/fluss/src/client/table/remote_log.rs Added S3 props injection, timeout handling, and debug logging for remote downloads
crates/fluss/src/client/mod.rs Exported credentials module
crates/fluss/src/rpc/message/get_security_token.rs New RPC message for fetching filesystem security tokens
crates/fluss/src/rpc/message/mod.rs Added get_security_token module and exports
crates/fluss/src/rpc/api_key.rs Added GetFileSystemSecurityToken API key (1025)
crates/fluss/src/record/arrow.rs Fixed metadata padding calculation and added record_batch_for_remote_log with full schema support
crates/fluss/src/proto/fluss_api.proto Added protocol buffer definitions for security token request/response
crates/fluss/Cargo.toml Added storage-s3 feature and enabled it by default
crates/examples/Cargo.toml Added dependencies for serde, serde_json, and opendal
Cargo.toml Added workspace dependencies for serde, serde_json, and opendal

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 235 to 239
eprintln!(
"Remote log download: reading chunk {}/{} (offset {})",
chunk_count, total_chunks, offset
);
}
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using eprintln! for logging download progress is not appropriate for production code. This outputs directly to stderr and cannot be controlled or filtered. Consider using the tracing crate (which is already a dependency of this project) for proper structured logging. This would allow users to control log levels and output destinations.

Copilot uses AI. Check for mistakes.
Comment on lines +125 to +128
let server_node = cluster
.get_coordinator_server()
.or_else(|| Some(cluster.get_one_available_server()))
.expect("no available server to fetch security token");
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using .expect() with a panic message bypasses proper error handling. If no server is available, this will cause a panic instead of returning a proper error. Consider using .ok_or_else() to return an appropriate Error type instead, allowing the caller to handle this failure gracefully.

Copilot uses AI. Check for mistakes.
// Get file metadata to know the size
let meta = op.stat(relative_path).await?;
// Timeout for remote storage operations (30 seconds)
const REMOTE_OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded timeout constant is defined locally in the function instead of at the module level or as a configurable constant. Consider moving this to a module-level constant for better maintainability and consistency with the pattern used in storage_s3.rs (lines 32-33) where timeouts are defined at the module level.

Suggested change
const REMOTE_OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);

Copilot uses AI. Check for mistakes.
Cargo.toml Outdated

serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
opendal = { version = "0.53", features = ["services-s3"] }
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The version specified in Cargo.toml (0.53) doesn't match the version used in the workspace (0.53.3 as seen in crates/fluss/Cargo.toml line 57). This inconsistency could lead to dependency resolution issues. Consider using workspace = true to inherit the version from the workspace root instead of specifying it directly.

Suggested change
opendal = { version = "0.53", features = ["services-s3"] }
opendal = { version = "0.53.3", features = ["services-s3"] }

Copilot uses AI. Check for mistakes.
Comment on lines +134 to +154
let credentials: Credentials = serde_json::from_slice(&response.token)
.map_err(|e| Error::JsonSerdeError(e.to_string()))?;

let mut addition_infos = HashMap::new();
for kv in &response.addition_info {
addition_infos.insert(kv.key.clone(), kv.value.clone());
}

let cached = CachedToken {
access_key_id: credentials.access_key_id,
secret_access_key: credentials.access_key_secret,
security_token: credentials.security_token,
addition_infos,
cached_at: Instant::now(),
};

let props = cached.to_s3_props();
*self.inner.write() = Some(cached);

Ok(props)
}
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The credentials cache has a fixed TTL of 1 hour (3600 seconds), but the response from the server includes an optional expiration_time field (see fluss_api.proto line 313). Consider using the server-provided expiration time when available instead of using a fixed duration, as the server may issue credentials with different lifetimes based on security policies.

Copilot uses AI. Check for mistakes.
Comment on lines +843 to +869
pub fn record_batch_for_remote_log(&self, data: &[u8]) -> Result<Option<RecordBatch>> {
let (batch_metadata, body_buffer, version) = match parse_ipc_message(data) {
Some(result) => result,
None => return Ok(None),
};

let record_batch = read_record_batch(
&body_buffer,
batch_metadata,
self.full_schema.clone(),
&std::collections::HashMap::new(),
None,
&version,
)?;

let record_batch = match &self.projection {
Some(projection) => {
let projected_columns: Vec<_> = projection
.projected_fields
.iter()
.map(|&idx| record_batch.column(idx).clone())
.collect();
RecordBatch::try_new(self.target_schema.clone(), projected_columns)?
}
None => record_batch,
};
Ok(Some(record_batch))
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment describes client-side projection, but the implementation differs from the regular record_batch() method. The regular method applies ordered_schema when projection exists (line 811), while this method always uses full_schema (line 852). This inconsistency could lead to incorrect data parsing when projections are involved. Consider documenting why this difference is necessary or aligning the implementations if the difference is unintentional.

Copilot uses AI. Check for mistakes.
Comment on lines +309 to +311

message GetFileSystemSecurityTokenResponse {
required string schema = 1;
Copy link

Copilot AI Dec 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new GetFileSystemSecurityTokenResponse message returns a token (parsed client-side as JSON with access_key_id/access_key_secret) over the existing RPC layer, which currently uses a plain TCP Transport::Plain without TLS or any encryption. Anyone with access to the network path between client and server can sniff this RPC and steal cloud credentials, enabling full compromise of the backing object store and its data. Protect this RPC with transport-level encryption (for example by adding TLS to the RPC transport) and/or change the design so that only short-lived, scoped tokens are issued and transmitted instead of raw access keys.

Suggested change
message GetFileSystemSecurityTokenResponse {
required string schema = 1;
// Response containing a short-lived, scoped security token for accessing the file system.
// WARNING: Do NOT transmit raw access keys or long-lived credentials in the token field.
// Only issue and transmit short-lived, scoped tokens (e.g., session tokens, JWTs, or
// cloud provider temporary credentials) with minimal privileges and a short expiration.
message GetFileSystemSecurityTokenResponse {
required string schema = 1;
// Short-lived, scoped security token (e.g., session token, JWT, or temporary credentials).
// MUST NOT contain raw access keys or long-lived secrets.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhaohaidao Thanks for the pr. Left minor comments. PTAL

/// Downloader for remote log segment files
pub struct RemoteLogDownloader {
local_log_dir: TempDir,
s3_props: RwLock<HashMap<String, String>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In here, we hard code to s3 props, but what if the remote storage is not s3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea here is this: the current implementation follows the principle of simplicity, avoiding over-encapsulation. Seeing the 's3' prefix should clearly indicate that this is the configuration used by s3, and other remote storages should not use it.

Do you have any suggestions? If you want different RemoteDownloaders to be implemented for different remote storages, I can also modify it.

if let Some(ref remote_log_fetch_info) =
fetch_log_for_bucket.remote_log_fetch_info
{
let s3_props = self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we always set s3 props, but what if it's not s3 as remote storage.

Ok(op.layer(timeout_layer))
}

pub(crate) fn parse_s3_path(path: &str) -> (&str, &str) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it's not used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in storage.rs. I

           #[cfg(feature = "storage-s3")]
            Storage::S3 { props } => {
                let (bucket, key) = super::parse_s3_path(path);
                let mut s3_props = props.clone();
                s3_props.insert("bucket".to_string(), bucket.to_string());
                let op = super::s3_config_build(&s3_props)?;
                Ok((op, key))
            }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants