From 1b7ae9e47343fe2317d887bd851bd53a356fc5e7 Mon Sep 17 00:00:00 2001 From: Henk-Jan Lebbink Date: Wed, 10 Dec 2025 07:34:49 +0100 Subject: [PATCH] performance optimizations: Signing key caching, stat_object to use HEAD (#197) This PR implements performance optimizations across the MinIO Rust SDK, focusing on reducing latency and improving throughput for high-performance use cases like DataFusion/ObjectStore integration. Key improvements include signing key caching, HTTP/2 optimization, region lookup bypass, and fast-path APIs. Key Changes: Performance optimizations: Signing key caching (4 HMAC ops saved per request), HTTP/2 adaptive window, optimized query/header string building, and a fast-path GET API Bug fixes: Corrected multipart copy logic, changed stat_object to use HEAD method, fixed region handling in tests API enhancements: Added get_object_fast(), into_boxed_stream(), and into_bytes() methods for high-performance scenarios --- CLAUDE.md | 28 +- Cargo.toml | 7 +- src/s3/builders/get_presigned_object_url.rs | 1 + .../get_presigned_policy_form_data.rs | 15 +- src/s3/builders/stat_object.rs | 8 +- src/s3/client/get_region.rs | 122 ++++++ src/s3/client/mod.rs | 347 +++++++++++++++++- src/s3/client/stat_object.rs | 9 +- src/s3/mod.rs | 2 +- src/s3/multimap_ext.rs | 206 +++++++++-- src/s3/response/get_object.rs | 31 ++ src/s3/response_traits.rs | 66 ++++ src/s3/signer.rs | 230 +++++++++++- src/s3/types/minio_error_response.rs | 24 ++ src/s3/utils.rs | 5 +- tests/s3/client_config.rs | 191 ++++++++++ tests/s3/get_object.rs | 127 +++++++ tests/s3/mod.rs | 3 + 18 files changed, 1352 insertions(+), 70 deletions(-) create mode 100644 tests/s3/client_config.rs diff --git a/CLAUDE.md b/CLAUDE.md index c4955bf..bd9cc3d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -2,7 +2,6 @@ - Only provide actionable feedback. - Exclude code style comments on generated files. These will have a header signifying that. -- Use github markdown folded sections for all items. - Do not use emojis. - Do not add a "feel good" section. @@ -53,6 +52,8 @@ All source files that haven't been generated MUST include the following copyrigh - Avoid obvious comments like `// Set x to 5` for `let x = 5;` - Only add comments when they explain WHY, not WHAT - Document complex algorithms or non-obvious business logic +- **NO historical references** - Never write comments like "Use X instead of Y" or "Replaces old Z" that reference removed code. Future readers won't have context about what was removed. Just describe what the code does now. +- **Use precise terminology** - Use accurate technical terms (e.g., "memoization" for multi-entry caching keyed by input parameters, "cache" for single-value storage). Imprecise terminology confuses readers about actual behavior. ## Critical Code Patterns @@ -110,12 +111,18 @@ impl Client { - Use `Cow<'_, str>` to avoid unnecessary allocations - Prefer iterators over collecting into intermediate vectors - Use `Box` sparingly; prefer generics when possible + - Prefer per-instance state over global statics to support multiple instances with different configurations 5. **Async Patterns** - Use `tokio::select!` for concurrent operations - Avoid blocking operations in async contexts - Use `async-trait` for async trait methods +6. **API Documentation** + - Document memory implications for methods that load data into memory + - Point users to streaming alternatives for large data handling + - Be explicit about peak memory usage when relevant + ## Code Quality Principles ### Why Code Quality Standards Are Mandatory @@ -220,18 +227,16 @@ Claude will periodically analyze the codebase and suggest: ### Pre-commit Checklist -**MANDATORY: ALL steps must pass before submitting any PR. No warnings or errors are acceptable.** +**MANDATORY: Run these steps before every commit. No warnings or errors are acceptable.** -Before any code changes: -1. ✅ **Format code**: Run `cargo fmt --all` to fix all formatting issues -2. ✅ **Fix clippy warnings**: Run `cargo clippy --fix --allow-dirty --allow-staged --all-targets` to auto-fix lints -3. ✅ **Verify clippy clean**: Run `cargo clippy --all-targets` and ensure **ZERO warnings** -4. ✅ **Run all tests**: Run `cargo test` to ensure all tests pass -5. ✅ **Build everything**: Run `cargo build --all-targets` to verify all code compiles -6. ✅ **Test coverage**: Ensure new code has appropriate test coverage -7. ✅ **No redundant comments**: Verify no redundant comments are added +1. ✅ **Format code**: `cargo fmt --all` +2. ✅ **Fix clippy warnings**: `cargo clippy --fix --allow-dirty --allow-staged --all-targets` +3. ✅ **Verify clippy clean**: `cargo clippy --all-targets` (must show **ZERO warnings**) +4. ✅ **Run all tests**: `cargo test` +5. ✅ **Run doc tests**: `cargo test --doc` +6. ✅ **Build everything**: `cargo build --all-targets` -**Note:** If clippy shows warnings, you MUST fix them. Use `cargo clippy --fix` or fix manually. +**Note:** If clippy shows warnings, you MUST fix them before committing. ## MinIO Server Setup for Testing @@ -373,6 +378,7 @@ fn operation() -> Result { - **Auto-fix clippy**: `cargo clippy --fix --allow-dirty --allow-staged --all-targets` - **Check clippy**: `cargo clippy --all-targets` (must show zero warnings) - **Run tests**: `cargo test` +- **Run doc tests**: `cargo test --doc` - **Run specific test**: `cargo test test_name` - **Build all**: `cargo build --all-targets` - **Build release**: `cargo build --release` diff --git a/Cargo.toml b/Cargo.toml index c05564d..3c64921 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,12 +11,16 @@ keywords = ["object-storage", "minio", "s3"] categories = ["api-bindings", "web-programming::http-client"] [features] -default = ["default-tls", "default-crypto"] +default = ["default-tls", "default-crypto", "http2"] default-tls = ["reqwest/default-tls"] native-tls = ["reqwest/native-tls"] rustls-tls = ["reqwest/rustls-tls"] default-crypto = ["dep:sha2", "dep:hmac"] +# ring provides faster crypto using assembly optimizations ring = ["dep:ring"] +# HTTP/2 support for improved throughput via multiplexing. +# Gracefully falls back to HTTP/1.1 when the server doesn't support it. +http2 = ["reqwest/http2"] localhost = [] [workspace.dependencies] @@ -60,7 +64,6 @@ regex = "1.12" ring = { version = "0.17", optional = true, default-features = false, features = ["alloc"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -serde_yaml = "0.9" sha2 = { version = "0.10", optional = true } urlencoding = "2.1" xmltree = "0.12" diff --git a/src/s3/builders/get_presigned_object_url.rs b/src/s3/builders/get_presigned_object_url.rs index 5a53375..ca4fe81 100644 --- a/src/s3/builders/get_presigned_object_url.rs +++ b/src/s3/builders/get_presigned_object_url.rs @@ -100,6 +100,7 @@ impl GetPresignedObjectUrl { }; presign_v4( + &self.client.shared.signing_key_cache, &self.method, &url.host_header_value(), &url.path, diff --git a/src/s3/builders/get_presigned_policy_form_data.rs b/src/s3/builders/get_presigned_policy_form_data.rs index d5d46be..eace5ea 100644 --- a/src/s3/builders/get_presigned_policy_form_data.rs +++ b/src/s3/builders/get_presigned_policy_form_data.rs @@ -17,12 +17,13 @@ use crate::s3::client::MinioClient; use crate::s3::creds::Credentials; use crate::s3::error::{Error, ValidationErr}; use crate::s3::header_constants::*; -use crate::s3::signer::post_presign_v4; +use crate::s3::signer::{SigningKeyCache, post_presign_v4}; use crate::s3::utils::{ UtcTime, b64_encode, check_bucket_name, to_amz_date, to_iso8601utc, to_signer_date, utc_now, }; use serde_json::{Value, json}; use std::collections::HashMap; +use std::sync::RwLock; use typed_builder::TypedBuilder; /// Argument builder for generating presigned POST policy for the [`POST Object`](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPOST.html) S3 API operation. @@ -46,6 +47,7 @@ impl GetPresignedPolicyFormData { let creds: Credentials = self.client.shared.provider.as_ref().unwrap().fetch(); self.policy .form_data( + &self.client.shared.signing_key_cache, creds.access_key, creds.secret_key, creds.session_token, @@ -293,8 +295,9 @@ impl PostPolicy { /// Generates form data for given access/secret keys, optional session token and region. /// The returned map contains `x-amz-algorithm`, `x-amz-credential`, `x-amz-security-token`, `x-amz-date`, `policy` and `x-amz-signature` keys and values. - pub fn form_data( + pub(crate) fn form_data( &self, + signing_key_cache: &RwLock, access_key: String, secret_key: String, session_token: Option, @@ -354,7 +357,13 @@ impl PostPolicy { }); let encoded_policy = b64_encode(policy.to_string()); - let signature = post_presign_v4(&encoded_policy, &secret_key, date, ®ion); + let signature = post_presign_v4( + signing_key_cache, + &encoded_policy, + &secret_key, + date, + ®ion, + ); let mut data: HashMap = HashMap::new(); data.insert(X_AMZ_ALGORITHM.into(), PostPolicy::ALGORITHM.to_string()); diff --git a/src/s3/builders/stat_object.rs b/src/s3/builders/stat_object.rs index f1ee0ee..5d4e798 100644 --- a/src/s3/builders/stat_object.rs +++ b/src/s3/builders/stat_object.rs @@ -30,6 +30,12 @@ use typed_builder::TypedBuilder; /// Argument builder for the [`HeadObject`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html) S3 API operation. /// /// This struct constructs the parameters required for the [`Client::stat_object`](crate::s3::client::MinioClient::stat_object) method. +/// +/// # HTTP Method +/// +/// This operation uses the HTTP HEAD method, which retrieves object metadata +/// without transferring the object body. This is more efficient than GET when +/// you only need metadata (size, ETag, Content-Type, Last-Modified, etc.). #[derive(Debug, Clone, TypedBuilder)] pub struct StatObject { #[builder(!default)] // force required @@ -115,7 +121,7 @@ impl ToS3Request for StatObject { Ok(S3Request::builder() .client(self.client) - .method(Method::GET) + .method(Method::HEAD) .region(self.region) .bucket(self.bucket) .object(self.object) diff --git a/src/s3/client/get_region.rs b/src/s3/client/get_region.rs index 865d4c5..39f4841 100644 --- a/src/s3/client/get_region.rs +++ b/src/s3/client/get_region.rs @@ -52,11 +52,19 @@ impl MinioClient { /// Retrieves the region for the specified bucket name from the cache. /// If the region is not found in the cache, it is fetched via a call to S3 or MinIO /// and then stored in the cache for future lookups. + /// + /// If `skip_region_lookup` is enabled on the client, this method returns + /// the default region immediately without making any network calls. pub async fn get_region_cached>( &self, bucket: S, region: &Option, // the region as provided by the S3Request ) -> Result { + // If skip_region_lookup is enabled (for MinIO servers), return default region immediately + if self.shared.skip_region_lookup { + return Ok(DEFAULT_REGION.to_owned()); + } + // If a region is provided, validate it against the base_url region if let Some(requested_region) = region { if !self.shared.base_url.region.is_empty() @@ -109,3 +117,117 @@ impl MinioClient { Ok(resolved_region) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::s3::client::MinioClientBuilder; + use crate::s3::creds::StaticProvider; + use crate::s3::http::BaseUrl; + + fn create_test_client(skip_region_lookup: bool) -> MinioClient { + let base_url: BaseUrl = "http://localhost:9000".parse().unwrap(); + MinioClientBuilder::new(base_url) + .provider(Some(StaticProvider::new("test", "test", None))) + .skip_region_lookup(skip_region_lookup) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_skip_region_lookup_returns_default_region() { + let client = create_test_client(true); + + // With skip_region_lookup enabled, should return default region immediately + let region = client.get_region_cached("any-bucket", &None).await.unwrap(); + + assert_eq!(region, DEFAULT_REGION); + } + + #[tokio::test] + async fn test_skip_region_lookup_ignores_provided_region() { + let client = create_test_client(true); + + // Even with a provided region, skip_region_lookup should return default + let region = client + .get_region_cached("any-bucket", &Some("eu-west-1".to_string())) + .await + .unwrap(); + + // skip_region_lookup takes precedence and returns default region + assert_eq!(region, DEFAULT_REGION); + } + + #[tokio::test] + async fn test_skip_region_lookup_multiple_calls_return_same_region() { + let client = create_test_client(true); + + // Multiple calls should consistently return the default region + for bucket in ["bucket1", "bucket2", "bucket3"] { + let region = client.get_region_cached(bucket, &None).await.unwrap(); + assert_eq!(region, DEFAULT_REGION); + } + } + + #[tokio::test] + async fn test_without_skip_region_lookup_uses_provided_region() { + let client = create_test_client(false); + + // Without skip_region_lookup, provided region should be used + let region = client + .get_region_cached("any-bucket", &Some("eu-west-1".to_string())) + .await + .unwrap(); + + assert_eq!(region, "eu-west-1"); + } + + #[tokio::test] + async fn test_without_skip_region_lookup_empty_bucket_returns_default() { + let client = create_test_client(false); + + // Empty bucket name should return default region + let region = client.get_region_cached("", &None).await.unwrap(); + + assert_eq!(region, DEFAULT_REGION); + } + + #[test] + fn test_skip_region_lookup_builder_default_is_false() { + let base_url: BaseUrl = "http://localhost:9000".parse().unwrap(); + let client = MinioClientBuilder::new(base_url) + .provider(Some(StaticProvider::new("test", "test", None))) + .build() + .unwrap(); + + // Default should be false (region lookup enabled) + assert!(!client.shared.skip_region_lookup); + } + + #[test] + fn test_skip_region_lookup_builder_can_be_enabled() { + let base_url: BaseUrl = "http://localhost:9000".parse().unwrap(); + let client = MinioClientBuilder::new(base_url) + .provider(Some(StaticProvider::new("test", "test", None))) + .skip_region_lookup(true) + .build() + .unwrap(); + + assert!(client.shared.skip_region_lookup); + } + + #[test] + fn test_skip_region_lookup_builder_can_be_toggled() { + let base_url: BaseUrl = "http://localhost:9000".parse().unwrap(); + + // Enable then disable + let client = MinioClientBuilder::new(base_url) + .provider(Some(StaticProvider::new("test", "test", None))) + .skip_region_lookup(true) + .skip_region_lookup(false) + .build() + .unwrap(); + + assert!(!client.shared.skip_region_lookup); + } +} diff --git a/src/s3/client/mod.rs b/src/s3/client/mod.rs index 80fc41d..760d90f 100644 --- a/src/s3/client/mod.rs +++ b/src/s3/client/mod.rs @@ -13,7 +13,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! S3 client to perform bucket and object operations +//! S3 client to perform bucket and object operations. +//! +//! # HTTP Version Support +//! +//! The client supports both HTTP/1.1 and HTTP/2. When connecting over TLS, +//! the client will negotiate HTTP/2 via ALPN if the server supports it, +//! otherwise it falls back to HTTP/1.1 gracefully. HTTP/2 provides better +//! throughput for parallel requests through multiplexing. +//! +//! HTTP/2 support is enabled by default via the `http2` feature flag. For +//! HTTP/1.1-only legacy S3-compatible services, you can disable it: +//! +//! ```toml +//! [dependencies] +//! minio = { version = "0.3", default-features = false, features = ["default-tls", "default-crypto"] } +//! ``` use bytes::Bytes; use dashmap::DashMap; @@ -26,7 +41,7 @@ use std::fs::File; use std::io::prelude::*; use std::mem; use std::path::{Path, PathBuf}; -use std::sync::{Arc, OnceLock}; +use std::sync::{Arc, OnceLock, RwLock}; use uuid::Uuid; use crate::s3::builders::{BucketExists, ComposeSource}; @@ -42,7 +57,7 @@ use crate::s3::multimap_ext::{Multimap, MultimapExt}; use crate::s3::response::*; use crate::s3::response_traits::{HasEtagFromHeaders, HasS3Fields}; use crate::s3::segmented_bytes::SegmentedBytes; -use crate::s3::signer::sign_v4_s3; +use crate::s3::signer::{SigningKeyCache, sign_v4_s3}; use crate::s3::utils::{EMPTY_SHA256, check_ssec_with_log, sha256_hash_sb, to_amz_date, utc_now}; mod append_object; @@ -143,6 +158,103 @@ impl Iterator for BodyIterator { /// exceeds this count, each part must be larger to remain within the limit. pub const MAX_MULTIPART_COUNT: u16 = 10_000; +/// Configuration for the HTTP connection pool. +/// +/// These settings allow tuning the client for different workloads: +/// - **High-throughput**: Increase `max_idle_per_host` and `idle_timeout` +/// - **Low-latency**: Enable `tcp_nodelay` (default) +/// - **Resource-constrained**: Reduce `max_idle_per_host` and `idle_timeout` +/// +/// # Example +/// +/// ``` +/// use minio::s3::client::ConnectionPoolConfig; +/// use std::time::Duration; +/// +/// // High-throughput configuration +/// let config = ConnectionPoolConfig::default() +/// .max_idle_per_host(64) +/// .idle_timeout(Duration::from_secs(120)); +/// +/// // Resource-constrained configuration +/// let config = ConnectionPoolConfig::default() +/// .max_idle_per_host(4) +/// .idle_timeout(Duration::from_secs(30)); +/// ``` +#[derive(Debug, Clone)] +pub struct ConnectionPoolConfig { + /// Maximum number of idle connections per host. + /// + /// Higher values allow more parallel requests but consume more memory. + /// Default: 32 (optimized for parallel S3 operations) + pub max_idle_per_host: usize, + + /// How long idle connections are kept in the pool. + /// + /// Longer timeouts reduce reconnection overhead but increase memory usage. + /// Default: 90 seconds + pub idle_timeout: std::time::Duration, + + /// TCP keepalive interval. + /// + /// Helps detect dead connections and keeps connections alive through NAT/firewalls. + /// Default: 60 seconds + pub tcp_keepalive: std::time::Duration, + + /// Enable TCP_NODELAY (disable Nagle's algorithm). + /// + /// Reduces latency for small requests but may reduce throughput on + /// high-bandwidth, high-latency links. Default: true + pub tcp_nodelay: bool, +} + +impl Default for ConnectionPoolConfig { + fn default() -> Self { + Self { + max_idle_per_host: 32, + idle_timeout: std::time::Duration::from_secs(90), + tcp_keepalive: std::time::Duration::from_secs(60), + tcp_nodelay: true, + } + } +} + +impl ConnectionPoolConfig { + /// Set the maximum number of idle connections per host. + /// + /// Higher values allow more parallel requests but consume more memory. + /// Typical values: 2-8 for light usage, 16-64 for heavy parallel workloads. + pub fn max_idle_per_host(mut self, max: usize) -> Self { + self.max_idle_per_host = max; + self + } + + /// Set how long idle connections are kept in the pool. + /// + /// Longer timeouts reduce reconnection overhead but increase memory usage. + pub fn idle_timeout(mut self, timeout: std::time::Duration) -> Self { + self.idle_timeout = timeout; + self + } + + /// Set the TCP keepalive interval. + /// + /// Helps detect dead connections and keeps connections alive through NAT/firewalls. + pub fn tcp_keepalive(mut self, interval: std::time::Duration) -> Self { + self.tcp_keepalive = interval; + self + } + + /// Enable or disable TCP_NODELAY (Nagle's algorithm). + /// + /// When enabled (default), reduces latency for small requests. + /// Disable for better throughput on high-bandwidth, high-latency links. + pub fn tcp_nodelay(mut self, enable: bool) -> Self { + self.tcp_nodelay = enable; + self + } +} + /// Client Builder manufactures a Client using given parameters. /// Creates a builder given a base URL for the MinIO service or other AWS S3 /// compatible object storage service. @@ -158,6 +270,10 @@ pub struct MinioClientBuilder { ignore_cert_check: Option, /// Set the app info as an Option of (app_name, app_version) pair. This will show up in the client's user-agent. app_info: Option<(String, String)>, + /// Skip region lookup for MinIO servers (region is not used by MinIO). + skip_region_lookup: bool, + /// HTTP connection pool configuration. + connection_pool_config: ConnectionPoolConfig, } impl MinioClientBuilder { @@ -171,6 +287,8 @@ impl MinioClientBuilder { ssl_cert_file: None, ignore_cert_check: None, app_info: None, + skip_region_lookup: false, + connection_pool_config: ConnectionPoolConfig::default(), } } @@ -208,9 +326,81 @@ impl MinioClientBuilder { self } + /// Skip region lookup for MinIO servers. + /// + /// MinIO does not use AWS regions, so region lookup is unnecessary overhead. + /// When enabled, the client will use the default region ("us-east-1") for + /// all requests without making network calls to determine the bucket region. + /// + /// This improves performance by eliminating the first-request latency penalty + /// caused by region discovery. + /// + /// # Example + /// + /// ```no_run + /// use minio::s3::client::MinioClientBuilder; + /// use minio::s3::creds::StaticProvider; + /// use minio::s3::http::BaseUrl; + /// + /// let base_url: BaseUrl = "http://localhost:9000".parse().unwrap(); + /// let client = MinioClientBuilder::new(base_url) + /// .provider(Some(StaticProvider::new("minioadmin", "minioadmin", None))) + /// .skip_region_lookup(true) + /// .build() + /// .unwrap(); + /// ``` + pub fn skip_region_lookup(mut self, skip: bool) -> Self { + self.skip_region_lookup = skip; + self + } + + /// Configure the HTTP connection pool settings. + /// + /// Allows tuning the client for different workloads (high-throughput, + /// low-latency, or resource-constrained environments). + /// + /// # Example + /// + /// ```no_run + /// use minio::s3::client::{MinioClientBuilder, ConnectionPoolConfig}; + /// use minio::s3::creds::StaticProvider; + /// use minio::s3::http::BaseUrl; + /// use std::time::Duration; + /// + /// let base_url: BaseUrl = "http://localhost:9000".parse().unwrap(); + /// + /// // High-throughput configuration for parallel uploads + /// let client = MinioClientBuilder::new(base_url) + /// .provider(Some(StaticProvider::new("minioadmin", "minioadmin", None))) + /// .connection_pool_config( + /// ConnectionPoolConfig::default() + /// .max_idle_per_host(64) + /// .idle_timeout(Duration::from_secs(120)) + /// ) + /// .build() + /// .unwrap(); + /// ``` + pub fn connection_pool_config(mut self, config: ConnectionPoolConfig) -> Self { + self.connection_pool_config = config; + self + } + /// Build the Client. pub fn build(self) -> Result { - let mut builder = reqwest::Client::builder().no_gzip(); + let pool_config = &self.connection_pool_config; + let mut builder = reqwest::Client::builder() + .no_gzip() + .tcp_nodelay(pool_config.tcp_nodelay) + .tcp_keepalive(pool_config.tcp_keepalive) + .pool_max_idle_per_host(pool_config.max_idle_per_host) + .pool_idle_timeout(pool_config.idle_timeout); + + // HTTP/2 adaptive window improves throughput when server supports HTTP/2. + // Has no effect with HTTP/1.1-only servers (graceful fallback). + #[cfg(feature = "http2")] + { + builder = builder.http2_adaptive_window(true); + } let mut user_agent = String::from("MinIO (") + std::env::consts::OS @@ -257,6 +447,8 @@ impl MinioClientBuilder { client_hooks: self.client_hooks, region_map: Default::default(), express: Default::default(), + skip_region_lookup: self.skip_region_lookup, + signing_key_cache: RwLock::new(SigningKeyCache::new()), }), }) } @@ -537,6 +729,7 @@ impl MinioClient { headers.add(X_AMZ_SECURITY_TOKEN, creds.session_token.unwrap()); } sign_v4_s3( + &self.shared.signing_key_cache, method, &url.path, region, @@ -729,6 +922,145 @@ impl MinioClient { Ok(()) } + /// Fast-path GET request that bypasses the general S3 API overhead. + /// + /// This method is optimized for high-performance object retrieval scenarios + /// like DataFusion/ObjectStore integration where minimal latency is critical. + /// + /// Returns the raw reqwest Response for direct stream access. + /// + /// # Arguments + /// * `bucket` - The bucket name (validated) + /// * `object` - The object key (validated) + /// * `range` - Optional byte range as (offset, length). If length is 0 or None, reads from offset to end. + /// + /// # Important Limitations + /// + /// This method bypasses several standard client features for performance: + /// + /// - **No hooks**: Client hooks registered via [`MinioClientBuilder::hook`] are NOT called. + /// This means custom authentication, logging, metrics, or request modification will not apply. + /// - **ALWAYS skips region lookup**: Unconditionally uses the default region ("us-east-1"), + /// **ignoring** the client's [`skip_region_lookup`](MinioClientBuilder::skip_region_lookup) setting. + /// This is correct for MinIO servers but **WILL FAIL** for AWS S3 buckets in non-default regions. + /// If your client is configured with `skip_region_lookup(false)` expecting region lookups to work, + /// this method will silently bypass that configuration and use "us-east-1" anyway. + /// - **No extra headers**: Does not add custom headers that might be configured elsewhere. + /// + /// # When to Use + /// + /// Use this method when: + /// - You need maximum throughput for bulk data retrieval + /// - You're integrating with systems like Apache Arrow/DataFusion + /// - You've already validated bucket/object names upstream + /// - You don't need hook functionality (logging, metrics, custom auth) + /// + /// # When NOT to Use + /// + /// Use the standard [`get_object`](MinioClient::get_object) API when: + /// - You need hook support for authentication, logging, or monitoring + /// - You're working with AWS S3 buckets that may be in non-default regions + /// - Your client has `skip_region_lookup(false)` and expects region lookups to work + /// - You want the full feature set of the SDK + /// + /// # Errors + /// + /// Returns an error if: + /// - Bucket name is invalid (same validation as standard API) + /// - Object name is invalid (same validation as standard API) + /// - The server returns a non-success status code + pub async fn get_object_fast( + &self, + bucket: &str, + object: &str, + range: Option<(u64, Option)>, + ) -> Result { + use crate::s3::utils::{check_bucket_name, check_object_name}; + + // Validate inputs (same as standard API) + check_bucket_name(bucket, true)?; + check_object_name(object)?; + + // Use default region (skip region lookup for performance) + let region = DEFAULT_REGION; + + // Build URL directly (no query params for GET) + let url = self.shared.base_url.build_url( + &Method::GET, + region, + &Multimap::new(), + Some(bucket), + Some(object), + )?; + + // Build headers in Multimap (single source of truth) + let date = utc_now(); + let mut headers = Multimap::new(); + headers.add(HOST, url.host_header_value()); + headers.add(X_AMZ_DATE, to_amz_date(date)); + headers.add(X_AMZ_CONTENT_SHA256, EMPTY_SHA256); + + // Add range header if specified + if let Some((offset, length)) = range { + let range_str = match length { + Some(len) if len > 0 => format!("bytes={}-{}", offset, offset + len - 1), + _ => format!("bytes={}-", offset), + }; + headers.add(RANGE, range_str); + } + + // Sign the request if we have credentials + if let Some(provider) = &self.shared.provider { + let creds = provider.fetch(); + if let Some(token) = &creds.session_token { + headers.add(X_AMZ_SECURITY_TOKEN, token); + } + + sign_v4_s3( + &self.shared.signing_key_cache, + &Method::GET, + &url.path, + region, + &mut headers, + &Multimap::new(), + &creds.access_key, + &creds.secret_key, + EMPTY_SHA256, + date, + ); + } + + // Build reqwest request and transfer all headers + let mut req = self.http_client.get(url.to_string()); + for (key, values) in headers.iter_all() { + for value in values { + req = req.header(key, value); + } + } + + // Send request + let resp = req.send().await.map_err(ValidationErr::from)?; + + if resp.status().is_success() { + return Ok(resp); + } + + // Handle error response + let status = resp.status(); + Err(Error::S3Server(S3ServerError::S3Error(Box::new( + MinioErrorResponse::from_status_and_message( + status.as_u16(), + format!( + "GET object failed with status {} ({}): {}/{}", + status.as_u16(), + status.canonical_reason().unwrap_or("Unknown"), + bucket, + object + ), + ), + )))) + } + /// create an example client for testing on localhost #[cfg(feature = "localhost")] pub fn create_client_on_localhost() @@ -745,13 +1077,18 @@ impl MinioClient { } } -#[derive(Clone, Debug)] +#[derive(Debug)] pub(crate) struct SharedClientItems { pub(crate) base_url: BaseUrl, pub(crate) provider: Option>, client_hooks: Vec>, region_map: DashMap, express: OnceLock, + pub(crate) skip_region_lookup: bool, + /// Cached precomputation of AWS Signature V4 signing keys. + /// Stored per-client to support multiple clients with different credentials + /// in the same process. + pub(crate) signing_key_cache: RwLock, } impl SharedClientItems { diff --git a/src/s3/client/stat_object.rs b/src/s3/client/stat_object.rs index 4eeb511..f93efa7 100644 --- a/src/s3/client/stat_object.rs +++ b/src/s3/client/stat_object.rs @@ -17,10 +17,15 @@ use crate::s3::builders::{StatObject, StatObjectBldr}; use crate::s3::client::MinioClient; impl MinioClient { - /// Creates a [`StatObject`] request builder. Given a bucket and object name, return some statistics. + /// Creates a [`StatObject`] request builder to retrieve object metadata. + /// + /// This operation uses the HTTP HEAD method (S3 HeadObject API) to efficiently + /// retrieve object metadata without downloading the object body. This is the + /// standard and most efficient way to check if an object exists and get its + /// metadata (size, ETag, Content-Type, user metadata, etc.). /// /// To execute the request, call [`StatObject::send()`](crate::s3::types::S3Api::send), - /// which returns a [`Result`] containing a [`StatObjectResponse`](crate::s3::response::StatObjectResponse). + /// which returns a [`Result`] containing a [`StatObjectResponse`](crate::s3::response::StatObjectResponse). /// /// # Example /// diff --git a/src/s3/mod.rs b/src/s3/mod.rs index f238a1d..4cf2a9f 100644 --- a/src/s3/mod.rs +++ b/src/s3/mod.rs @@ -31,5 +31,5 @@ pub mod types; pub mod utils; // Re-export types module contents for convenience -pub use client::{MinioClient, MinioClientBuilder}; +pub use client::{ConnectionPoolConfig, MinioClient, MinioClientBuilder}; pub use types::{header_constants, lifecycle_config, minio_error_response, sse}; diff --git a/src/s3/multimap_ext.rs b/src/s3/multimap_ext.rs index ce0f8c8..8796e13 100644 --- a/src/s3/multimap_ext.rs +++ b/src/s3/multimap_ext.rs @@ -14,13 +14,38 @@ // limitations under the License. use crate::s3::utils::url_encode; -use lazy_static::lazy_static; -use regex::Regex; +use std::borrow::Cow; use std::collections::BTreeMap; /// Multimap for string key and string value pub type Multimap = multimap::MultiMap; +/// Collapses multiple spaces into a single space (avoids regex overhead). +/// +/// Returns `Cow::Borrowed` when no transformation is needed (common case), +/// avoiding allocation for header values that don't contain consecutive spaces. +#[inline] +fn collapse_spaces(s: &str) -> Cow<'_, str> { + let trimmed = s.trim(); + if !trimmed.contains(" ") { + return Cow::Borrowed(trimmed); + } + let mut result = String::with_capacity(trimmed.len()); + let mut prev_space = false; + for c in trimmed.chars() { + if c == ' ' { + if !prev_space { + result.push(' '); + prev_space = true; + } + } else { + result.push(c); + prev_space = false; + } + } + Cow::Owned(result) +} + pub trait MultimapExt { /// Adds a key-value pair to the multimap fn add, V: Into>(&mut self, key: K, value: V); @@ -76,60 +101,77 @@ impl MultimapExt for Multimap { } fn get_canonical_query_string(&self) -> String { - let mut keys: Vec = Vec::new(); - for (key, _) in self.iter() { - keys.push(key.to_string()); - } - keys.sort(); + // Use BTreeMap for automatic sorting (avoids explicit sort) + let mut sorted: BTreeMap<&str, Vec<&str>> = BTreeMap::new(); + let mut total_len = 0usize; - let mut query = String::new(); - for key in keys { - match self.get_vec(key.as_str()) { - Some(values) => { - for value in values { - if !query.is_empty() { - query.push('&'); - } - query.push_str(&url_encode(key.as_str())); - query.push('='); - query.push_str(&url_encode(value)); - } + for (key, values) in self.iter_all() { + for value in values { + // Pre-calculate total length to avoid reallocations. + // Most S3 query params are alphanumeric (uploadId, partNumber, versionId) + // so we use actual length + 20% buffer for occasional URL encoding. + total_len += key.len() + 1 + value.len() + 2; // key=value& + } + sorted + .entry(key.as_str()) + .or_default() + .extend(values.iter().map(|s| s.as_str())); + } + + // Add 20% buffer for URL encoding overhead + let mut query = String::with_capacity(total_len + total_len / 5); + for (key, values) in sorted { + for value in values { + if !query.is_empty() { + query.push('&'); } - None => todo!(), // This never happens. - }; + query.push_str(&url_encode(key)); + query.push('='); + query.push_str(&url_encode(value)); + } } query } fn get_canonical_headers(&self) -> (String, String) { - lazy_static! { - static ref MULTI_SPACE_REGEX: Regex = Regex::new("( +)").unwrap(); - } + // Use BTreeMap for automatic sorting (avoids explicit sort) let mut btmap: BTreeMap = BTreeMap::new(); + // Pre-calculate sizes for better allocation + let mut key_bytes = 0usize; + let mut value_bytes = 0usize; + for (k, values) in self.iter_all() { let key = k.to_lowercase(); - if "authorization" == key || "user-agent" == key { + if key == "authorization" || key == "user-agent" { continue; } - let mut vs = values.clone(); + // Sort values in place if needed + let mut vs: Vec<&String> = values.iter().collect(); vs.sort(); - let mut value = String::new(); + let mut value = + String::with_capacity(vs.iter().map(|v| v.len()).sum::() + vs.len()); for v in vs { if !value.is_empty() { value.push(','); } - let s: String = MULTI_SPACE_REGEX.replace_all(&v, " ").trim().to_string(); - value.push_str(&s); + value.push_str(&collapse_spaces(v)); } - btmap.insert(key.clone(), value.clone()); + + key_bytes += key.len(); + value_bytes += value.len(); + btmap.insert(key, value); } - let mut signed_headers = String::new(); - let mut canonical_headers = String::new(); + // Pre-allocate output strings + let header_count = btmap.len(); + let mut signed_headers = String::with_capacity(key_bytes + header_count); + let mut canonical_headers = + String::with_capacity(key_bytes + value_bytes + header_count * 2); + let mut add_delim = false; for (key, value) in &btmap { if add_delim { @@ -149,3 +191,103 @@ impl MultimapExt for Multimap { (signed_headers, canonical_headers) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_collapse_spaces_no_consecutive_spaces() { + // Should return Cow::Borrowed (no allocation) + let result = collapse_spaces("hello world"); + assert_eq!(result, "hello world"); + assert!(matches!(result, Cow::Borrowed(_))); + } + + #[test] + fn test_collapse_spaces_with_consecutive_spaces() { + // Should return Cow::Owned with spaces collapsed + let result = collapse_spaces("hello world"); + assert_eq!(result, "hello world"); + assert!(matches!(result, Cow::Owned(_))); + + let result = collapse_spaces("hello world"); + assert_eq!(result, "hello world"); + + let result = collapse_spaces("a b c d"); + assert_eq!(result, "a b c d"); + } + + #[test] + fn test_collapse_spaces_multiple_groups() { + let result = collapse_spaces("hello world foo bar"); + assert_eq!(result, "hello world foo bar"); + } + + #[test] + fn test_collapse_spaces_leading_trailing() { + // Leading and trailing spaces should be trimmed + let result = collapse_spaces(" hello world "); + assert_eq!(result, "hello world"); + assert!(matches!(result, Cow::Borrowed(_))); + + let result = collapse_spaces(" hello world "); + assert_eq!(result, "hello world"); + assert!(matches!(result, Cow::Owned(_))); + } + + #[test] + fn test_collapse_spaces_only_spaces() { + let result = collapse_spaces(" "); + assert_eq!(result, ""); + assert!(matches!(result, Cow::Borrowed(_))); + } + + #[test] + fn test_collapse_spaces_empty_string() { + let result = collapse_spaces(""); + assert_eq!(result, ""); + assert!(matches!(result, Cow::Borrowed(_))); + } + + #[test] + fn test_collapse_spaces_single_space() { + let result = collapse_spaces(" "); + assert_eq!(result, ""); + assert!(matches!(result, Cow::Borrowed(_))); + } + + #[test] + fn test_collapse_spaces_no_spaces() { + let result = collapse_spaces("helloworld"); + assert_eq!(result, "helloworld"); + assert!(matches!(result, Cow::Borrowed(_))); + } + + #[test] + fn test_collapse_spaces_tabs_not_collapsed() { + // Only spaces are collapsed, not tabs + let result = collapse_spaces("hello\t\tworld"); + assert_eq!(result, "hello\t\tworld"); + assert!(matches!(result, Cow::Borrowed(_))); + } + + #[test] + fn test_collapse_spaces_mixed_whitespace() { + // Tabs and spaces mixed - only consecutive spaces collapsed + let result = collapse_spaces("hello \t world"); + assert_eq!(result, "hello \t world"); + } + + #[test] + fn test_collapse_spaces_realistic_header_value() { + // Realistic header value that should not need modification + let result = collapse_spaces("application/json"); + assert_eq!(result, "application/json"); + assert!(matches!(result, Cow::Borrowed(_))); + + let result = collapse_spaces("bytes=0-1023"); + assert_eq!(result, "bytes=0-1023"); + assert!(matches!(result, Cow::Borrowed(_))); + } +} diff --git a/src/s3/response/get_object.rs b/src/s3/response/get_object.rs index 01dcfd1..ca22b1b 100644 --- a/src/s3/response/get_object.rs +++ b/src/s3/response/get_object.rs @@ -23,6 +23,13 @@ use bytes::Bytes; use futures_util::TryStreamExt; use http::HeaderMap; use std::mem; +use std::pin::Pin; + +/// Type alias for a boxed byte stream with size, used by [`GetObjectResponse::into_boxed_stream`]. +pub type BoxedByteStream = ( + Pin> + Send>>, + u64, +); pub struct GetObjectResponse { request: S3Request, @@ -47,6 +54,30 @@ impl GetObjectResponse { Ok(ObjectContent::new_from_stream(body, Some(content_length))) } + /// Returns the content as a boxed stream for direct streaming access. + /// + /// This is more efficient than `content().to_stream().await` for scenarios + /// requiring minimal overhead, as it bypasses the async wrapper entirely. + /// Use this for high-throughput scenarios like DataFusion queries. + pub fn into_boxed_stream(self) -> Result { + let content_length = self.object_size()?; + let stream = Box::pin(self.resp.bytes_stream().map_err(std::io::Error::other)); + Ok((stream, content_length)) + } + + /// Consumes the response and returns all content as bytes. + /// + /// **Memory usage**: This loads the entire object into memory. For objects + /// larger than available RAM, this may cause out-of-memory errors. For large + /// objects, use [`into_boxed_stream`](Self::into_boxed_stream) to process + /// data incrementally. + pub async fn into_bytes(self) -> Result { + self.resp + .bytes() + .await + .map_err(|e| ValidationErr::HttpError(e).into()) + } + /// Returns the content size (in Bytes) of the object. pub fn object_size(&self) -> Result { self.resp diff --git a/src/s3/response_traits.rs b/src/s3/response_traits.rs index 8aa739c..04a1ae2 100644 --- a/src/s3/response_traits.rs +++ b/src/s3/response_traits.rs @@ -1,3 +1,69 @@ +//! Response traits for accessing S3 metadata from HTTP response headers. +//! +//! This module provides a collection of traits that enable typed, ergonomic access to +//! metadata from S3 API responses. These traits extract data from HTTP headers and response +//! bodies returned by various S3 operations. +//! +//! # Design Philosophy +//! +//! Rather than exposing raw headers directly, these traits provide: +//! - **Type-safe access**: Automatic parsing and type conversion +//! - **Consistent API**: Uniform method names across different response types +//! - **Composability**: Mix and match traits based on what metadata is available +//! +//! # Metadata Sources +//! +//! Metadata is available from two primary sources: +//! +//! ## 1. HEAD Requests (Metadata Only) +//! +//! Operations like [`stat_object`](crate::s3::client::MinioClient::stat_object) use HEAD requests +//! to retrieve object metadata without downloading the object body. These responses typically +//! implement traits like: +//! - [`HasVersion`]: Object version ID (via `x-amz-version-id` header) +//! - [`HasObjectSize`]: Object size in bytes (via `x-amz-object-size` or `Content-Length` header) +//! - [`HasEtagFromHeaders`]: Object ETag/hash (via `ETag` header) +//! - [`HasChecksumHeaders`]: Object checksum values (via `x-amz-checksum-*` headers) +//! - [`HasIsDeleteMarker`]: Whether the object is a delete marker (via `x-amz-delete-marker` header) +//! +//! ## 2. GET Requests (Metadata + Body) +//! +//! Operations like [`get_object`](crate::s3::client::MinioClient::get_object) return both +//! metadata headers AND the object body. These responses can implement both header-based +//! traits (above) and body-parsing traits like: +//! - [`HasEtagFromBody`]: ETag parsed from XML response body +//! +//! # Example: StatObjectResponse +//! +//! The [`StatObjectResponse`](crate::s3::response::StatObjectResponse) demonstrates how +//! multiple traits compose together. It uses a HEAD request and provides: +//! +//! ```rust,ignore +//! impl HasBucket for StatObjectResponse {} +//! impl HasRegion for StatObjectResponse {} +//! impl HasObject for StatObjectResponse {} +//! impl HasEtagFromHeaders for StatObjectResponse {} +//! impl HasIsDeleteMarker for StatObjectResponse {} +//! impl HasChecksumHeaders for StatObjectResponse {} +//! impl HasVersion for StatObjectResponse {} // Version ID from header +//! impl HasObjectSize for StatObjectResponse {} // Size from header +//! ``` +//! +//! This allows users to access metadata uniformly: +//! +//! ```rust,ignore +//! let response = client.stat_object(&args).await?; +//! let size = response.object_size(); // From HasObjectSize trait +//! let version = response.version_id(); // From HasVersion trait +//! let checksum = response.checksum_crc32c()?; // From HasChecksumHeaders trait +//! ``` +//! +//! # Performance Considerations +//! +//! - **HEAD vs GET**: HEAD requests are faster when you only need metadata (no body transfer) +//! - **Header parsing**: Trait methods use `#[inline]` for zero-cost abstractions +//! - **Lazy evaluation**: Metadata is parsed on-demand, not upfront + use crate::s3::error::ValidationErr; use crate::s3::header_constants::*; use crate::s3::types::S3Request; diff --git a/src/s3/signer.rs b/src/s3/signer.rs index f8f9306..1a4fa12 100644 --- a/src/s3/signer.rs +++ b/src/s3/signer.rs @@ -14,6 +14,15 @@ // limitations under the License. //! Signature V4 for S3 API +//! +//! Includes signing key caching for performance optimization. +//! The signing key only depends on (secret_key, date, region, service), +//! so we store the last computed key and reuse it when inputs match. +//! +//! Caching is per-client to support: +//! - Multiple clients with different credentials in the same process +//! - Credential rotation where old and new credentials are used simultaneously +//! - Multi-tenant applications use crate::s3::header_constants::*; use crate::s3::multimap_ext::{Multimap, MultimapExt}; @@ -25,6 +34,97 @@ use hyper::http::Method; use ring::hmac; #[cfg(not(feature = "ring"))] use sha2::Sha256; +use std::sync::{Arc, RwLock}; + +/// Cached precomputation of AWS Signature V4 signing keys. +/// +/// Computing a signing key requires 4 HMAC-SHA256 operations. Since the key only +/// changes when date, region, or service changes, we cache the result to avoid +/// redundant computation on subsequent requests. +/// +/// This is stored per-client (in `SharedClientItems`) rather than globally to +/// support multiple clients with different credentials in the same process. +/// +/// # Validation +/// +/// **What we validate:** +/// - Date (YYYYMMDD): Changes daily, always validated +/// - Region: Changes per bucket, always validated +/// - Service: Always "s3", validated for correctness +/// +/// **What we DON'T validate:** +/// - Secret key: Deliberately omitted for security and performance +/// +/// **Why not validate secret key?** +/// +/// 1. **Security**: Storing the secret key (even hashed) increases memory exposure risk +/// 2. **Performance**: Hashing the secret key on every cache check adds overhead +/// 3. **Acceptable tradeoff**: Credential rotation is rare; the caller can handle +/// authentication errors by creating a new client with updated credentials +/// +/// # Concurrency +/// +/// Uses RwLock to allow concurrent reads while only blocking for writes. +/// Uses Arc for zero-copy sharing of the signing key across threads. +#[derive(Debug, Clone)] +pub(crate) struct SigningKeyCache { + /// The cached signing key (Arc allows zero-copy sharing on cache hits) + key: Arc<[u8]>, + /// The date string (YYYYMMDD) this key was computed for + date_str: String, + /// The region this key was computed for + region: String, + /// The service name this key was computed for + service: String, +} + +impl Default for SigningKeyCache { + fn default() -> Self { + Self::new() + } +} + +impl SigningKeyCache { + pub(crate) fn new() -> Self { + Self { + key: Arc::from(Vec::new()), + date_str: String::new(), + region: String::new(), + service: String::new(), + } + } + + /// Checks if the cached signing key is valid for the given parameters. + /// + /// Note: Does NOT validate the secret key. See struct-level documentation + /// for the rationale behind this design decision. + #[inline] + fn matches(&self, date_str: &str, region: &str, service: &str) -> bool { + // Check most likely to change first (date changes daily) + self.date_str == date_str && self.region == region && self.service == service + } + + /// Returns the cached signing key if it matches the given parameters. + /// + /// Returns `None` if the cache is invalid (different date/region/service). + /// Uses Arc::clone for zero-copy sharing (just atomic reference count increment). + #[inline] + fn get_key_if_matches(&self, date_str: &str, region: &str, service: &str) -> Option> { + if self.matches(date_str, region, service) { + Some(Arc::clone(&self.key)) + } else { + None + } + } + + /// Updates the cache with a new signing key and associated parameters. + fn update(&mut self, key: Arc<[u8]>, date_str: String, region: String, service: String) { + self.key = key; + self.date_str = date_str; + self.region = region; + self.service = service; + } +} /// Returns HMAC hash for given key and data. fn hmac_hash(key: &[u8], data: &[u8]) -> Vec { @@ -78,17 +178,87 @@ fn get_string_to_sign(date: UtcTime, scope: &str, canonical_request_hash: &str) ) } -/// Returns signing key of given secret key, date, region and service name. -fn get_signing_key(secret_key: &str, date: UtcTime, region: &str, service_name: &str) -> Vec { +/// Computes the signing key (uncached) for given secret key, date, region and service name. +fn compute_signing_key( + secret_key: &str, + date_str: &str, + region: &str, + service_name: &str, +) -> Vec { let mut key: Vec = b"AWS4".to_vec(); key.extend(secret_key.as_bytes()); - let date_key = hmac_hash(key.as_slice(), to_signer_date(date).as_bytes()); + let date_key = hmac_hash(key.as_slice(), date_str.as_bytes()); let date_region_key = hmac_hash(date_key.as_slice(), region.as_bytes()); let date_region_service_key = hmac_hash(date_region_key.as_slice(), service_name.as_bytes()); hmac_hash(date_region_service_key.as_slice(), b"aws4_request") } +/// Returns signing key of given secret key, date, region and service name. +/// +/// Uses caching to avoid recomputing the signing key for every request. +/// The signing key only changes when the date (YYYYMMDD), region, or service changes, +/// so we store the last computed key and reuse it when inputs match. +/// +/// # Performance +/// +/// **Cache hits (common case after first request of the day per region):** +/// - Returns cached key via Arc::clone (atomic reference count increment) +/// - Multiple threads can read simultaneously via RwLock +/// +/// **Cache misses (daily date change or region change):** +/// - Computes new signing key (4 HMAC-SHA256 operations) +/// - Computation happens outside the lock to avoid blocking readers +/// - Brief write lock to update cache with new key +/// +/// # Credential Rotation +/// +/// The cache does not validate credentials - it only checks date/region/service. +/// If credentials rotate while date/region/service remain the same, the cached +/// signing key (derived from old credentials) will be used, causing S3 to return +/// an authentication error. The caller is responsible for handling credential +/// rotation at a higher level. +fn get_signing_key( + cache: &RwLock, + secret_key: &str, + date: UtcTime, + region: &str, + service_name: &str, +) -> Arc<[u8]> { + let date_str = to_signer_date(date); + + // Fast path: try to get from cache with read lock (allows concurrent reads) + // Zero allocations on cache hit - just Arc::clone (atomic increment) + if let Ok(cache_guard) = cache.read() + && let Some(key) = cache_guard.get_key_if_matches(&date_str, region, service_name) + { + return key; + } + + // Cache miss - compute the signing key outside the lock (4 HMAC operations) + // Multiple threads may compute simultaneously on cache miss, but that's acceptable + // since HMAC is deterministic and the brief redundant computation is better than + // blocking all threads during the expensive operation. + let signing_key = Arc::from(compute_signing_key( + secret_key, + &date_str, + region, + service_name, + )); + + // Update cache with write lock (brief, just updating Arc references) + if let Ok(mut cache_guard) = cache.write() { + cache_guard.update( + Arc::clone(&signing_key), + date_str, + region.to_string(), + service_name.to_string(), + ); + } + + signing_key +} + /// Returns signature value for given signing key and string-to-sign. fn get_signature(signing_key: &[u8], string_to_sign: &[u8]) -> String { hmac_hash_hex(signing_key, string_to_sign) @@ -108,6 +278,7 @@ fn get_authorization( /// Signs and updates headers for given parameters. fn sign_v4( + cache: &RwLock, service_name: &str, method: &Method, uri: &str, @@ -131,15 +302,18 @@ fn sign_v4( content_sha256, ); let string_to_sign = get_string_to_sign(date, &scope, &canonical_request_hash); - let signing_key = get_signing_key(secret_key, date, region, service_name); - let signature = get_signature(signing_key.as_slice(), string_to_sign.as_bytes()); + let signing_key = get_signing_key(cache, secret_key, date, region, service_name); + let signature = get_signature(&signing_key, string_to_sign.as_bytes()); let authorization = get_authorization(access_key, &scope, &signed_headers, &signature); headers.add(AUTHORIZATION, authorization); } /// Signs and updates headers for the given S3 request parameters. +/// +/// The `cache` parameter should be the per-client `signing_key_cache` from `SharedClientItems`. pub(crate) fn sign_v4_s3( + cache: &RwLock, method: &Method, uri: &str, region: &str, @@ -151,6 +325,7 @@ pub(crate) fn sign_v4_s3( date: UtcTime, ) { sign_v4( + cache, "s3", method, uri, @@ -165,7 +340,10 @@ pub(crate) fn sign_v4_s3( } /// Signs and updates query parameters for the given presigned request. +/// +/// The `cache` parameter should be the per-client `signing_key_cache` from `SharedClientItems`. pub(crate) fn presign_v4( + cache: &RwLock, method: &Method, host: &str, uri: &str, @@ -196,21 +374,24 @@ pub(crate) fn presign_v4( "UNSIGNED-PAYLOAD", ); let string_to_sign = get_string_to_sign(date, &scope, &canonical_request_hash); - let signing_key = get_signing_key(secret_key, date, region, "s3"); - let signature = get_signature(signing_key.as_slice(), string_to_sign.as_bytes()); + let signing_key = get_signing_key(cache, secret_key, date, region, "s3"); + let signature = get_signature(&signing_key, string_to_sign.as_bytes()); query_params.add(X_AMZ_SIGNATURE, signature); } /// Returns signature for the given presigned POST request parameters. +/// +/// The `cache` parameter should be the per-client `signing_key_cache` from `SharedClientItems`. pub(crate) fn post_presign_v4( + cache: &RwLock, string_to_sign: &str, secret_key: &str, date: UtcTime, region: &str, ) -> String { - let signing_key = get_signing_key(secret_key, date, region, "s3"); - get_signature(signing_key.as_slice(), string_to_sign.as_bytes()) + let signing_key = get_signing_key(cache, secret_key, date, region, "s3"); + get_signature(&signing_key, string_to_sign.as_bytes()) } #[cfg(test)] @@ -226,12 +407,18 @@ mod tests { Utc.with_ymd_and_hms(2013, 5, 24, 0, 0, 0).unwrap() } + // Create a test cache for unit tests + fn test_cache() -> RwLock { + RwLock::new(SigningKeyCache::new()) + } + // =========================== // sign_v4_s3 Tests (Public API) // =========================== #[test] fn test_sign_v4_s3_adds_authorization_header() { + let cache = test_cache(); let method = Method::GET; let uri = "/bucket/key"; let region = "us-east-1"; @@ -249,6 +436,7 @@ mod tests { let query_params = Multimap::new(); sign_v4_s3( + &cache, &method, uri, region, @@ -270,6 +458,7 @@ mod tests { #[test] fn test_sign_v4_s3_deterministic() { + let cache = test_cache(); let method = Method::GET; let uri = "/test"; let region = "us-east-1"; @@ -290,6 +479,7 @@ mod tests { headers2.add(X_AMZ_DATE, "20130524T000000Z"); sign_v4_s3( + &cache, &method, uri, region, @@ -302,6 +492,7 @@ mod tests { ); sign_v4_s3( + &cache, &method, uri, region, @@ -319,6 +510,7 @@ mod tests { #[test] fn test_sign_v4_s3_different_methods() { + let cache = test_cache(); let region = "us-east-1"; let uri = "/test"; let access_key = "test"; @@ -338,6 +530,7 @@ mod tests { headers_put.add(X_AMZ_DATE, "20130524T000000Z"); sign_v4_s3( + &cache, &Method::GET, uri, region, @@ -350,6 +543,7 @@ mod tests { ); sign_v4_s3( + &cache, &Method::PUT, uri, region, @@ -370,6 +564,7 @@ mod tests { #[test] fn test_sign_v4_s3_with_special_characters() { + let cache = test_cache(); let method = Method::GET; let uri = "/bucket/my file.txt"; // Space in filename let region = "us-east-1"; @@ -387,6 +582,7 @@ mod tests { // Should not panic sign_v4_s3( + &cache, &method, uri, region, @@ -407,6 +603,7 @@ mod tests { #[test] fn test_presign_v4_adds_query_params() { + let cache = test_cache(); let method = Method::GET; let host = "s3.amazonaws.com"; let uri = "/bucket/key"; @@ -418,6 +615,7 @@ mod tests { let expires = 3600; presign_v4( + &cache, &method, host, uri, @@ -440,6 +638,7 @@ mod tests { #[test] fn test_presign_v4_algorithm_value() { + let cache = test_cache(); let method = Method::GET; let host = "s3.amazonaws.com"; let uri = "/test"; @@ -451,6 +650,7 @@ mod tests { let expires = 3600; presign_v4( + &cache, &method, host, uri, @@ -468,6 +668,7 @@ mod tests { #[test] fn test_presign_v4_expires_value() { + let cache = test_cache(); let method = Method::GET; let host = "s3.amazonaws.com"; let uri = "/test"; @@ -479,6 +680,7 @@ mod tests { let expires = 7200; presign_v4( + &cache, &method, host, uri, @@ -496,6 +698,7 @@ mod tests { #[test] fn test_presign_v4_credential_format() { + let cache = test_cache(); let method = Method::GET; let host = "s3.amazonaws.com"; let uri = "/test"; @@ -507,6 +710,7 @@ mod tests { let expires = 3600; presign_v4( + &cache, &method, host, uri, @@ -532,12 +736,13 @@ mod tests { #[test] fn test_post_presign_v4() { + let cache = test_cache(); let string_to_sign = "test_string_to_sign"; let secret_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"; let date = get_test_date(); let region = "us-east-1"; - let signature = post_presign_v4(string_to_sign, secret_key, date, region); + let signature = post_presign_v4(&cache, string_to_sign, secret_key, date, region); // Should produce 64 character hex signature assert_eq!(signature.len(), 64); @@ -546,13 +751,14 @@ mod tests { #[test] fn test_post_presign_v4_deterministic() { + let cache = test_cache(); let string_to_sign = "test_string"; let secret_key = "test_secret"; let date = get_test_date(); let region = "us-east-1"; - let sig1 = post_presign_v4(string_to_sign, secret_key, date, region); - let sig2 = post_presign_v4(string_to_sign, secret_key, date, region); + let sig1 = post_presign_v4(&cache, string_to_sign, secret_key, date, region); + let sig2 = post_presign_v4(&cache, string_to_sign, secret_key, date, region); assert_eq!(sig1, sig2); } diff --git a/src/s3/types/minio_error_response.rs b/src/s3/types/minio_error_response.rs index 318f0b3..004b23a 100644 --- a/src/s3/types/minio_error_response.rs +++ b/src/s3/types/minio_error_response.rs @@ -254,6 +254,30 @@ impl MinioErrorResponse { } } + /// Create a minimal error response from status code and message. + /// + /// Used for fast-path operations where full error details aren't available. + pub fn from_status_and_message(status_code: u16, message: String) -> Self { + let code = match status_code { + 404 => MinioErrorCode::NoSuchKey, + 403 => MinioErrorCode::AccessDenied, + 401 => MinioErrorCode::AccessDenied, + 400 => MinioErrorCode::BadRequest, + 409 => MinioErrorCode::ResourceConflict, + _ => MinioErrorCode::OtherError(format!("HTTP {}", status_code)), + }; + Self { + headers: HeaderMap::new(), + code, + message: Some(message), + resource: String::new(), + request_id: String::new(), + host_id: String::new(), + bucket_name: None, + object_name: None, + } + } + pub fn new_from_body(body: Bytes, headers: HeaderMap) -> Result { let root = Element::parse(body.reader()).map_err(ValidationErr::from)?; Ok(Self { diff --git a/src/s3/utils.rs b/src/s3/utils.rs index 4ec5d4b..bad58e9 100644 --- a/src/s3/utils.rs +++ b/src/s3/utils.rs @@ -941,7 +941,10 @@ pub fn get_text_option(element: &Element, tag: &str) -> Option { .and_then(|v| v.get_text().map(|s| s.to_string())) } -/// Trims leading and trailing quotes from a string. Note: consumes the input string. +/// Trims leading and trailing quotes from a string. +/// +/// Takes ownership of and potentially modifies the input string in place +/// (via `drain` and `pop`). The original string is not preserved. pub fn trim_quotes(mut s: String) -> String { if s.len() >= 2 && s.starts_with('"') && s.ends_with('"') { s.drain(0..1); // remove the leading quote diff --git a/tests/s3/client_config.rs b/tests/s3/client_config.rs new file mode 100644 index 0000000..b75494b --- /dev/null +++ b/tests/s3/client_config.rs @@ -0,0 +1,191 @@ +// MinIO Rust Library for Amazon S3 Compatible Cloud Storage +// Copyright 2025 MinIO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Tests for client configuration options like skip_region_lookup. + +use bytes::Bytes; +use minio::s3::MinioClient; +use minio::s3::client::MinioClientBuilder; +use minio::s3::creds::StaticProvider; +use minio::s3::response::{GetObjectResponse, PutObjectContentResponse}; +use minio::s3::response_traits::{HasBucket, HasObject}; +use minio::s3::types::{S3Api, ToStream}; +use minio_common::test_context::TestContext; +use minio_common::utils::rand_object_name; + +/// Helper to create a client with skip_region_lookup enabled. +fn create_client_with_skip_region_lookup(ctx: &TestContext) -> MinioClient { + let mut builder = MinioClientBuilder::new(ctx.base_url.clone()) + .provider(Some(StaticProvider::new( + &ctx.access_key, + &ctx.secret_key, + None, + ))) + .skip_region_lookup(true); + + if let Some(ignore_cert) = ctx.ignore_cert_check { + builder = builder.ignore_cert_check(Some(ignore_cert)); + } + + if let Some(ref ssl_cert_file) = ctx.ssl_cert_file { + builder = builder.ssl_cert_file(Some(ssl_cert_file)); + } + + builder.build().unwrap() +} + +/// Test that skip_region_lookup allows basic put/get operations. +/// This verifies operations work correctly when region lookup is skipped. +#[minio_macros::test] +async fn skip_region_lookup_put_get_object(ctx: TestContext, bucket_name: String) { + let client = create_client_with_skip_region_lookup(&ctx); + let object_name = rand_object_name(); + let data: Bytes = Bytes::from("test data with skip_region_lookup"); + + // Put object using client with skip_region_lookup + let put_resp: PutObjectContentResponse = client + .put_object_content(&bucket_name, &object_name, data.clone()) + .build() + .send() + .await + .unwrap(); + + assert_eq!(put_resp.bucket(), bucket_name); + assert_eq!(put_resp.object(), object_name); + + // Get object using the same client + let get_resp: GetObjectResponse = client + .get_object(&bucket_name, &object_name) + .build() + .send() + .await + .unwrap(); + + assert_eq!(get_resp.bucket(), bucket_name); + assert_eq!(get_resp.object(), object_name); + + let got = get_resp.into_bytes().await.unwrap(); + assert_eq!(got, data); +} + +/// Test that skip_region_lookup works for bucket operations. +#[minio_macros::test] +async fn skip_region_lookup_bucket_exists(ctx: TestContext, bucket_name: String) { + let client = create_client_with_skip_region_lookup(&ctx); + + // Check bucket exists using client with skip_region_lookup + let exists = client + .bucket_exists(&bucket_name) + .build() + .send() + .await + .unwrap() + .exists(); + + assert!(exists, "Bucket should exist"); +} + +/// Test that skip_region_lookup works for list operations. +#[minio_macros::test] +async fn skip_region_lookup_list_objects(ctx: TestContext, bucket_name: String) { + let client = create_client_with_skip_region_lookup(&ctx); + + // List objects using client with skip_region_lookup + // Just verify the operation completes without error + let mut stream = client.list_objects(&bucket_name).build().to_stream().await; + + use futures_util::StreamExt; + // Consume the stream - may be empty, but should not error + let mut count = 0; + while let Some(result) = stream.next().await { + // Just verify we can read items without error + let _item = result.unwrap(); + count += 1; + // Don't iterate forever + if count > 100 { + break; + } + } + + // Test passes if we get here without error +} + +/// Test that multiple operations work in sequence with skip_region_lookup. +/// This verifies that the default region is consistently used. +#[minio_macros::test] +async fn skip_region_lookup_multiple_operations(ctx: TestContext, bucket_name: String) { + let client = create_client_with_skip_region_lookup(&ctx); + + // Perform multiple operations to ensure consistent behavior + for i in 0..3 { + let object_name = format!("test-object-{}", i); + let data: Bytes = Bytes::from(format!("data for object {}", i)); + + // Put + client + .put_object_content(&bucket_name, &object_name, data.clone()) + .build() + .send() + .await + .unwrap(); + + // Get + let resp: GetObjectResponse = client + .get_object(&bucket_name, &object_name) + .build() + .send() + .await + .unwrap(); + + let got = resp.into_bytes().await.unwrap(); + assert_eq!(got, data); + + // Delete + client + .delete_object(&bucket_name, &object_name) + .build() + .send() + .await + .unwrap(); + } +} + +/// Test that skip_region_lookup does not affect stat_object operations. +#[minio_macros::test] +async fn skip_region_lookup_stat_object(ctx: TestContext, bucket_name: String) { + let client = create_client_with_skip_region_lookup(&ctx); + let object_name = rand_object_name(); + let data: Bytes = Bytes::from("test data for stat"); + + // Put object + client + .put_object_content(&bucket_name, &object_name, data.clone()) + .build() + .send() + .await + .unwrap(); + + // Stat object using client with skip_region_lookup + let stat_resp = client + .stat_object(&bucket_name, &object_name) + .build() + .send() + .await + .unwrap(); + + assert_eq!(stat_resp.bucket(), bucket_name); + assert_eq!(stat_resp.object(), object_name); + assert_eq!(stat_resp.size().unwrap(), data.len() as u64); +} diff --git a/tests/s3/get_object.rs b/tests/s3/get_object.rs index e06378c..338f5fa 100644 --- a/tests/s3/get_object.rs +++ b/tests/s3/get_object.rs @@ -14,6 +14,7 @@ // limitations under the License. use bytes::Bytes; +use futures_util::TryStreamExt; use minio::s3::response::{GetObjectResponse, PutObjectContentResponse}; use minio::s3::response_traits::{HasBucket, HasObject}; use minio::s3::types::S3Api; @@ -65,3 +66,129 @@ async fn get_object_1(ctx: TestContext, bucket_name: String) { async fn get_object_2(ctx: TestContext, bucket_name: String) { test_get_object(&ctx, &bucket_name, "a b+c").await; } + +/// Test into_bytes method for direct byte retrieval. +#[minio_macros::test] +async fn get_object_into_bytes(ctx: TestContext, bucket_name: String) { + let object_name = rand_object_name_utf8(20); + let data: Bytes = Bytes::from("test data for into_bytes method"); + + // Upload test object + ctx.client + .put_object_content(&bucket_name, &object_name, data.clone()) + .build() + .send() + .await + .unwrap(); + + // Retrieve using into_bytes + let resp: GetObjectResponse = ctx + .client + .get_object(&bucket_name, &object_name) + .build() + .send() + .await + .unwrap(); + + // Verify content-length before consuming + assert_eq!(resp.object_size().unwrap(), data.len() as u64); + + // Get bytes directly + let got = resp.into_bytes().await.unwrap(); + assert_eq!(got, data); +} + +/// Test into_boxed_stream method for streaming access. +#[minio_macros::test] +async fn get_object_into_boxed_stream(ctx: TestContext, bucket_name: String) { + let object_name = rand_object_name_utf8(20); + let data: Bytes = Bytes::from("test data for into_boxed_stream method"); + + // Upload test object + ctx.client + .put_object_content(&bucket_name, &object_name, data.clone()) + .build() + .send() + .await + .unwrap(); + + // Retrieve using into_boxed_stream + let resp: GetObjectResponse = ctx + .client + .get_object(&bucket_name, &object_name) + .build() + .send() + .await + .unwrap(); + + // Get stream and content length + let (stream, content_length) = resp.into_boxed_stream().unwrap(); + assert_eq!(content_length, data.len() as u64); + + // Collect all bytes from the stream + let chunks: Vec = stream.try_collect().await.unwrap(); + let got: Bytes = chunks.into_iter().flatten().collect(); + assert_eq!(got, data); +} + +/// Test into_boxed_stream with larger content to verify chunked streaming. +#[minio_macros::test] +async fn get_object_into_boxed_stream_large(ctx: TestContext, bucket_name: String) { + let object_name = rand_object_name_utf8(20); + // Create larger test data (1MB) to ensure multiple chunks + let data: Bytes = Bytes::from(vec![0xABu8; 1024 * 1024]); + + // Upload test object + ctx.client + .put_object_content(&bucket_name, &object_name, data.clone()) + .build() + .send() + .await + .unwrap(); + + // Retrieve using into_boxed_stream + let resp: GetObjectResponse = ctx + .client + .get_object(&bucket_name, &object_name) + .build() + .send() + .await + .unwrap(); + + let (stream, content_length) = resp.into_boxed_stream().unwrap(); + assert_eq!(content_length, data.len() as u64); + + // Collect and verify + let chunks: Vec = stream.try_collect().await.unwrap(); + let got: Bytes = chunks.into_iter().flatten().collect(); + assert_eq!(got.len(), data.len()); + assert_eq!(got, data); +} + +/// Test into_bytes with empty content. +#[minio_macros::test] +async fn get_object_into_bytes_empty(ctx: TestContext, bucket_name: String) { + let object_name = rand_object_name_utf8(20); + let data: Bytes = Bytes::new(); + + // Upload empty object + ctx.client + .put_object_content(&bucket_name, &object_name, data.clone()) + .build() + .send() + .await + .unwrap(); + + // Retrieve using into_bytes + let resp: GetObjectResponse = ctx + .client + .get_object(&bucket_name, &object_name) + .build() + .send() + .await + .unwrap(); + + assert_eq!(resp.object_size().unwrap(), 0); + let got = resp.into_bytes().await.unwrap(); + assert!(got.is_empty()); +} diff --git a/tests/s3/mod.rs b/tests/s3/mod.rs index f5d4959..c72724f 100644 --- a/tests/s3/mod.rs +++ b/tests/s3/mod.rs @@ -15,6 +15,9 @@ //! S3 API Integration Tests +// Client configuration +mod client_config; + // Object operations mod append_object; mod get_object;