mirror of
https://github.com/minio/minio-rs.git
synced 2026-01-22 15:42:10 +08:00
performance optimizations: Signing key caching, stat_object to use HEAD (#197)
This PR implements performance optimizations across the MinIO Rust SDK, focusing on reducing latency and improving throughput for high-performance use cases like DataFusion/ObjectStore integration. Key improvements include signing key caching, HTTP/2 optimization, region lookup bypass, and fast-path APIs. Key Changes: Performance optimizations: Signing key caching (4 HMAC ops saved per request), HTTP/2 adaptive window, optimized query/header string building, and a fast-path GET API Bug fixes: Corrected multipart copy logic, changed stat_object to use HEAD method, fixed region handling in tests API enhancements: Added get_object_fast(), into_boxed_stream(), and into_bytes() methods for high-performance scenarios
This commit is contained in:
parent
2daacc0fcf
commit
1b7ae9e473
28
CLAUDE.md
28
CLAUDE.md
@ -2,7 +2,6 @@
|
|||||||
|
|
||||||
- Only provide actionable feedback.
|
- Only provide actionable feedback.
|
||||||
- Exclude code style comments on generated files. These will have a header signifying that.
|
- Exclude code style comments on generated files. These will have a header signifying that.
|
||||||
- Use github markdown folded sections for all items.
|
|
||||||
- Do not use emojis.
|
- Do not use emojis.
|
||||||
- Do not add a "feel good" section.
|
- Do not add a "feel good" section.
|
||||||
|
|
||||||
@ -53,6 +52,8 @@ All source files that haven't been generated MUST include the following copyrigh
|
|||||||
- Avoid obvious comments like `// Set x to 5` for `let x = 5;`
|
- Avoid obvious comments like `// Set x to 5` for `let x = 5;`
|
||||||
- Only add comments when they explain WHY, not WHAT
|
- Only add comments when they explain WHY, not WHAT
|
||||||
- Document complex algorithms or non-obvious business logic
|
- Document complex algorithms or non-obvious business logic
|
||||||
|
- **NO historical references** - Never write comments like "Use X instead of Y" or "Replaces old Z" that reference removed code. Future readers won't have context about what was removed. Just describe what the code does now.
|
||||||
|
- **Use precise terminology** - Use accurate technical terms (e.g., "memoization" for multi-entry caching keyed by input parameters, "cache" for single-value storage). Imprecise terminology confuses readers about actual behavior.
|
||||||
|
|
||||||
## Critical Code Patterns
|
## Critical Code Patterns
|
||||||
|
|
||||||
@ -110,12 +111,18 @@ impl Client {
|
|||||||
- Use `Cow<'_, str>` to avoid unnecessary allocations
|
- Use `Cow<'_, str>` to avoid unnecessary allocations
|
||||||
- Prefer iterators over collecting into intermediate vectors
|
- Prefer iterators over collecting into intermediate vectors
|
||||||
- Use `Box<dyn Trait>` sparingly; prefer generics when possible
|
- Use `Box<dyn Trait>` sparingly; prefer generics when possible
|
||||||
|
- Prefer per-instance state over global statics to support multiple instances with different configurations
|
||||||
|
|
||||||
5. **Async Patterns**
|
5. **Async Patterns**
|
||||||
- Use `tokio::select!` for concurrent operations
|
- Use `tokio::select!` for concurrent operations
|
||||||
- Avoid blocking operations in async contexts
|
- Avoid blocking operations in async contexts
|
||||||
- Use `async-trait` for async trait methods
|
- Use `async-trait` for async trait methods
|
||||||
|
|
||||||
|
6. **API Documentation**
|
||||||
|
- Document memory implications for methods that load data into memory
|
||||||
|
- Point users to streaming alternatives for large data handling
|
||||||
|
- Be explicit about peak memory usage when relevant
|
||||||
|
|
||||||
## Code Quality Principles
|
## Code Quality Principles
|
||||||
|
|
||||||
### Why Code Quality Standards Are Mandatory
|
### Why Code Quality Standards Are Mandatory
|
||||||
@ -220,18 +227,16 @@ Claude will periodically analyze the codebase and suggest:
|
|||||||
|
|
||||||
### Pre-commit Checklist
|
### Pre-commit Checklist
|
||||||
|
|
||||||
**MANDATORY: ALL steps must pass before submitting any PR. No warnings or errors are acceptable.**
|
**MANDATORY: Run these steps before every commit. No warnings or errors are acceptable.**
|
||||||
|
|
||||||
Before any code changes:
|
1. ✅ **Format code**: `cargo fmt --all`
|
||||||
1. ✅ **Format code**: Run `cargo fmt --all` to fix all formatting issues
|
2. ✅ **Fix clippy warnings**: `cargo clippy --fix --allow-dirty --allow-staged --all-targets`
|
||||||
2. ✅ **Fix clippy warnings**: Run `cargo clippy --fix --allow-dirty --allow-staged --all-targets` to auto-fix lints
|
3. ✅ **Verify clippy clean**: `cargo clippy --all-targets` (must show **ZERO warnings**)
|
||||||
3. ✅ **Verify clippy clean**: Run `cargo clippy --all-targets` and ensure **ZERO warnings**
|
4. ✅ **Run all tests**: `cargo test`
|
||||||
4. ✅ **Run all tests**: Run `cargo test` to ensure all tests pass
|
5. ✅ **Run doc tests**: `cargo test --doc`
|
||||||
5. ✅ **Build everything**: Run `cargo build --all-targets` to verify all code compiles
|
6. ✅ **Build everything**: `cargo build --all-targets`
|
||||||
6. ✅ **Test coverage**: Ensure new code has appropriate test coverage
|
|
||||||
7. ✅ **No redundant comments**: Verify no redundant comments are added
|
|
||||||
|
|
||||||
**Note:** If clippy shows warnings, you MUST fix them. Use `cargo clippy --fix` or fix manually.
|
**Note:** If clippy shows warnings, you MUST fix them before committing.
|
||||||
|
|
||||||
## MinIO Server Setup for Testing
|
## MinIO Server Setup for Testing
|
||||||
|
|
||||||
@ -373,6 +378,7 @@ fn operation() -> Result<Response, Error> {
|
|||||||
- **Auto-fix clippy**: `cargo clippy --fix --allow-dirty --allow-staged --all-targets`
|
- **Auto-fix clippy**: `cargo clippy --fix --allow-dirty --allow-staged --all-targets`
|
||||||
- **Check clippy**: `cargo clippy --all-targets` (must show zero warnings)
|
- **Check clippy**: `cargo clippy --all-targets` (must show zero warnings)
|
||||||
- **Run tests**: `cargo test`
|
- **Run tests**: `cargo test`
|
||||||
|
- **Run doc tests**: `cargo test --doc`
|
||||||
- **Run specific test**: `cargo test test_name`
|
- **Run specific test**: `cargo test test_name`
|
||||||
- **Build all**: `cargo build --all-targets`
|
- **Build all**: `cargo build --all-targets`
|
||||||
- **Build release**: `cargo build --release`
|
- **Build release**: `cargo build --release`
|
||||||
|
|||||||
@ -11,12 +11,16 @@ keywords = ["object-storage", "minio", "s3"]
|
|||||||
categories = ["api-bindings", "web-programming::http-client"]
|
categories = ["api-bindings", "web-programming::http-client"]
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["default-tls", "default-crypto"]
|
default = ["default-tls", "default-crypto", "http2"]
|
||||||
default-tls = ["reqwest/default-tls"]
|
default-tls = ["reqwest/default-tls"]
|
||||||
native-tls = ["reqwest/native-tls"]
|
native-tls = ["reqwest/native-tls"]
|
||||||
rustls-tls = ["reqwest/rustls-tls"]
|
rustls-tls = ["reqwest/rustls-tls"]
|
||||||
default-crypto = ["dep:sha2", "dep:hmac"]
|
default-crypto = ["dep:sha2", "dep:hmac"]
|
||||||
|
# ring provides faster crypto using assembly optimizations
|
||||||
ring = ["dep:ring"]
|
ring = ["dep:ring"]
|
||||||
|
# HTTP/2 support for improved throughput via multiplexing.
|
||||||
|
# Gracefully falls back to HTTP/1.1 when the server doesn't support it.
|
||||||
|
http2 = ["reqwest/http2"]
|
||||||
localhost = []
|
localhost = []
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
@ -60,7 +64,6 @@ regex = "1.12"
|
|||||||
ring = { version = "0.17", optional = true, default-features = false, features = ["alloc"] }
|
ring = { version = "0.17", optional = true, default-features = false, features = ["alloc"] }
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
serde_yaml = "0.9"
|
|
||||||
sha2 = { version = "0.10", optional = true }
|
sha2 = { version = "0.10", optional = true }
|
||||||
urlencoding = "2.1"
|
urlencoding = "2.1"
|
||||||
xmltree = "0.12"
|
xmltree = "0.12"
|
||||||
|
|||||||
@ -100,6 +100,7 @@ impl GetPresignedObjectUrl {
|
|||||||
};
|
};
|
||||||
|
|
||||||
presign_v4(
|
presign_v4(
|
||||||
|
&self.client.shared.signing_key_cache,
|
||||||
&self.method,
|
&self.method,
|
||||||
&url.host_header_value(),
|
&url.host_header_value(),
|
||||||
&url.path,
|
&url.path,
|
||||||
|
|||||||
@ -17,12 +17,13 @@ use crate::s3::client::MinioClient;
|
|||||||
use crate::s3::creds::Credentials;
|
use crate::s3::creds::Credentials;
|
||||||
use crate::s3::error::{Error, ValidationErr};
|
use crate::s3::error::{Error, ValidationErr};
|
||||||
use crate::s3::header_constants::*;
|
use crate::s3::header_constants::*;
|
||||||
use crate::s3::signer::post_presign_v4;
|
use crate::s3::signer::{SigningKeyCache, post_presign_v4};
|
||||||
use crate::s3::utils::{
|
use crate::s3::utils::{
|
||||||
UtcTime, b64_encode, check_bucket_name, to_amz_date, to_iso8601utc, to_signer_date, utc_now,
|
UtcTime, b64_encode, check_bucket_name, to_amz_date, to_iso8601utc, to_signer_date, utc_now,
|
||||||
};
|
};
|
||||||
use serde_json::{Value, json};
|
use serde_json::{Value, json};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::sync::RwLock;
|
||||||
use typed_builder::TypedBuilder;
|
use typed_builder::TypedBuilder;
|
||||||
|
|
||||||
/// Argument builder for generating presigned POST policy for the [`POST Object`](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPOST.html) S3 API operation.
|
/// Argument builder for generating presigned POST policy for the [`POST Object`](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPOST.html) S3 API operation.
|
||||||
@ -46,6 +47,7 @@ impl GetPresignedPolicyFormData {
|
|||||||
let creds: Credentials = self.client.shared.provider.as_ref().unwrap().fetch();
|
let creds: Credentials = self.client.shared.provider.as_ref().unwrap().fetch();
|
||||||
self.policy
|
self.policy
|
||||||
.form_data(
|
.form_data(
|
||||||
|
&self.client.shared.signing_key_cache,
|
||||||
creds.access_key,
|
creds.access_key,
|
||||||
creds.secret_key,
|
creds.secret_key,
|
||||||
creds.session_token,
|
creds.session_token,
|
||||||
@ -293,8 +295,9 @@ impl PostPolicy {
|
|||||||
|
|
||||||
/// Generates form data for given access/secret keys, optional session token and region.
|
/// Generates form data for given access/secret keys, optional session token and region.
|
||||||
/// The returned map contains `x-amz-algorithm`, `x-amz-credential`, `x-amz-security-token`, `x-amz-date`, `policy` and `x-amz-signature` keys and values.
|
/// The returned map contains `x-amz-algorithm`, `x-amz-credential`, `x-amz-security-token`, `x-amz-date`, `policy` and `x-amz-signature` keys and values.
|
||||||
pub fn form_data(
|
pub(crate) fn form_data(
|
||||||
&self,
|
&self,
|
||||||
|
signing_key_cache: &RwLock<SigningKeyCache>,
|
||||||
access_key: String,
|
access_key: String,
|
||||||
secret_key: String,
|
secret_key: String,
|
||||||
session_token: Option<String>,
|
session_token: Option<String>,
|
||||||
@ -354,7 +357,13 @@ impl PostPolicy {
|
|||||||
});
|
});
|
||||||
|
|
||||||
let encoded_policy = b64_encode(policy.to_string());
|
let encoded_policy = b64_encode(policy.to_string());
|
||||||
let signature = post_presign_v4(&encoded_policy, &secret_key, date, ®ion);
|
let signature = post_presign_v4(
|
||||||
|
signing_key_cache,
|
||||||
|
&encoded_policy,
|
||||||
|
&secret_key,
|
||||||
|
date,
|
||||||
|
®ion,
|
||||||
|
);
|
||||||
|
|
||||||
let mut data: HashMap<String, String> = HashMap::new();
|
let mut data: HashMap<String, String> = HashMap::new();
|
||||||
data.insert(X_AMZ_ALGORITHM.into(), PostPolicy::ALGORITHM.to_string());
|
data.insert(X_AMZ_ALGORITHM.into(), PostPolicy::ALGORITHM.to_string());
|
||||||
|
|||||||
@ -30,6 +30,12 @@ use typed_builder::TypedBuilder;
|
|||||||
/// Argument builder for the [`HeadObject`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html) S3 API operation.
|
/// Argument builder for the [`HeadObject`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html) S3 API operation.
|
||||||
///
|
///
|
||||||
/// This struct constructs the parameters required for the [`Client::stat_object`](crate::s3::client::MinioClient::stat_object) method.
|
/// This struct constructs the parameters required for the [`Client::stat_object`](crate::s3::client::MinioClient::stat_object) method.
|
||||||
|
///
|
||||||
|
/// # HTTP Method
|
||||||
|
///
|
||||||
|
/// This operation uses the HTTP HEAD method, which retrieves object metadata
|
||||||
|
/// without transferring the object body. This is more efficient than GET when
|
||||||
|
/// you only need metadata (size, ETag, Content-Type, Last-Modified, etc.).
|
||||||
#[derive(Debug, Clone, TypedBuilder)]
|
#[derive(Debug, Clone, TypedBuilder)]
|
||||||
pub struct StatObject {
|
pub struct StatObject {
|
||||||
#[builder(!default)] // force required
|
#[builder(!default)] // force required
|
||||||
@ -115,7 +121,7 @@ impl ToS3Request for StatObject {
|
|||||||
|
|
||||||
Ok(S3Request::builder()
|
Ok(S3Request::builder()
|
||||||
.client(self.client)
|
.client(self.client)
|
||||||
.method(Method::GET)
|
.method(Method::HEAD)
|
||||||
.region(self.region)
|
.region(self.region)
|
||||||
.bucket(self.bucket)
|
.bucket(self.bucket)
|
||||||
.object(self.object)
|
.object(self.object)
|
||||||
|
|||||||
@ -52,11 +52,19 @@ impl MinioClient {
|
|||||||
/// Retrieves the region for the specified bucket name from the cache.
|
/// Retrieves the region for the specified bucket name from the cache.
|
||||||
/// If the region is not found in the cache, it is fetched via a call to S3 or MinIO
|
/// If the region is not found in the cache, it is fetched via a call to S3 or MinIO
|
||||||
/// and then stored in the cache for future lookups.
|
/// and then stored in the cache for future lookups.
|
||||||
|
///
|
||||||
|
/// If `skip_region_lookup` is enabled on the client, this method returns
|
||||||
|
/// the default region immediately without making any network calls.
|
||||||
pub async fn get_region_cached<S: Into<String>>(
|
pub async fn get_region_cached<S: Into<String>>(
|
||||||
&self,
|
&self,
|
||||||
bucket: S,
|
bucket: S,
|
||||||
region: &Option<String>, // the region as provided by the S3Request
|
region: &Option<String>, // the region as provided by the S3Request
|
||||||
) -> Result<String, Error> {
|
) -> Result<String, Error> {
|
||||||
|
// If skip_region_lookup is enabled (for MinIO servers), return default region immediately
|
||||||
|
if self.shared.skip_region_lookup {
|
||||||
|
return Ok(DEFAULT_REGION.to_owned());
|
||||||
|
}
|
||||||
|
|
||||||
// If a region is provided, validate it against the base_url region
|
// If a region is provided, validate it against the base_url region
|
||||||
if let Some(requested_region) = region {
|
if let Some(requested_region) = region {
|
||||||
if !self.shared.base_url.region.is_empty()
|
if !self.shared.base_url.region.is_empty()
|
||||||
@ -109,3 +117,117 @@ impl MinioClient {
|
|||||||
Ok(resolved_region)
|
Ok(resolved_region)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::s3::client::MinioClientBuilder;
|
||||||
|
use crate::s3::creds::StaticProvider;
|
||||||
|
use crate::s3::http::BaseUrl;
|
||||||
|
|
||||||
|
fn create_test_client(skip_region_lookup: bool) -> MinioClient {
|
||||||
|
let base_url: BaseUrl = "http://localhost:9000".parse().unwrap();
|
||||||
|
MinioClientBuilder::new(base_url)
|
||||||
|
.provider(Some(StaticProvider::new("test", "test", None)))
|
||||||
|
.skip_region_lookup(skip_region_lookup)
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_skip_region_lookup_returns_default_region() {
|
||||||
|
let client = create_test_client(true);
|
||||||
|
|
||||||
|
// With skip_region_lookup enabled, should return default region immediately
|
||||||
|
let region = client.get_region_cached("any-bucket", &None).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(region, DEFAULT_REGION);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_skip_region_lookup_ignores_provided_region() {
|
||||||
|
let client = create_test_client(true);
|
||||||
|
|
||||||
|
// Even with a provided region, skip_region_lookup should return default
|
||||||
|
let region = client
|
||||||
|
.get_region_cached("any-bucket", &Some("eu-west-1".to_string()))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// skip_region_lookup takes precedence and returns default region
|
||||||
|
assert_eq!(region, DEFAULT_REGION);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_skip_region_lookup_multiple_calls_return_same_region() {
|
||||||
|
let client = create_test_client(true);
|
||||||
|
|
||||||
|
// Multiple calls should consistently return the default region
|
||||||
|
for bucket in ["bucket1", "bucket2", "bucket3"] {
|
||||||
|
let region = client.get_region_cached(bucket, &None).await.unwrap();
|
||||||
|
assert_eq!(region, DEFAULT_REGION);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_without_skip_region_lookup_uses_provided_region() {
|
||||||
|
let client = create_test_client(false);
|
||||||
|
|
||||||
|
// Without skip_region_lookup, provided region should be used
|
||||||
|
let region = client
|
||||||
|
.get_region_cached("any-bucket", &Some("eu-west-1".to_string()))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(region, "eu-west-1");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_without_skip_region_lookup_empty_bucket_returns_default() {
|
||||||
|
let client = create_test_client(false);
|
||||||
|
|
||||||
|
// Empty bucket name should return default region
|
||||||
|
let region = client.get_region_cached("", &None).await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(region, DEFAULT_REGION);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_skip_region_lookup_builder_default_is_false() {
|
||||||
|
let base_url: BaseUrl = "http://localhost:9000".parse().unwrap();
|
||||||
|
let client = MinioClientBuilder::new(base_url)
|
||||||
|
.provider(Some(StaticProvider::new("test", "test", None)))
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Default should be false (region lookup enabled)
|
||||||
|
assert!(!client.shared.skip_region_lookup);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_skip_region_lookup_builder_can_be_enabled() {
|
||||||
|
let base_url: BaseUrl = "http://localhost:9000".parse().unwrap();
|
||||||
|
let client = MinioClientBuilder::new(base_url)
|
||||||
|
.provider(Some(StaticProvider::new("test", "test", None)))
|
||||||
|
.skip_region_lookup(true)
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert!(client.shared.skip_region_lookup);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_skip_region_lookup_builder_can_be_toggled() {
|
||||||
|
let base_url: BaseUrl = "http://localhost:9000".parse().unwrap();
|
||||||
|
|
||||||
|
// Enable then disable
|
||||||
|
let client = MinioClientBuilder::new(base_url)
|
||||||
|
.provider(Some(StaticProvider::new("test", "test", None)))
|
||||||
|
.skip_region_lookup(true)
|
||||||
|
.skip_region_lookup(false)
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert!(!client.shared.skip_region_lookup);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -13,7 +13,22 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
//! S3 client to perform bucket and object operations
|
//! S3 client to perform bucket and object operations.
|
||||||
|
//!
|
||||||
|
//! # HTTP Version Support
|
||||||
|
//!
|
||||||
|
//! The client supports both HTTP/1.1 and HTTP/2. When connecting over TLS,
|
||||||
|
//! the client will negotiate HTTP/2 via ALPN if the server supports it,
|
||||||
|
//! otherwise it falls back to HTTP/1.1 gracefully. HTTP/2 provides better
|
||||||
|
//! throughput for parallel requests through multiplexing.
|
||||||
|
//!
|
||||||
|
//! HTTP/2 support is enabled by default via the `http2` feature flag. For
|
||||||
|
//! HTTP/1.1-only legacy S3-compatible services, you can disable it:
|
||||||
|
//!
|
||||||
|
//! ```toml
|
||||||
|
//! [dependencies]
|
||||||
|
//! minio = { version = "0.3", default-features = false, features = ["default-tls", "default-crypto"] }
|
||||||
|
//! ```
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
@ -26,7 +41,7 @@ use std::fs::File;
|
|||||||
use std::io::prelude::*;
|
use std::io::prelude::*;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::{Arc, OnceLock};
|
use std::sync::{Arc, OnceLock, RwLock};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::s3::builders::{BucketExists, ComposeSource};
|
use crate::s3::builders::{BucketExists, ComposeSource};
|
||||||
@ -42,7 +57,7 @@ use crate::s3::multimap_ext::{Multimap, MultimapExt};
|
|||||||
use crate::s3::response::*;
|
use crate::s3::response::*;
|
||||||
use crate::s3::response_traits::{HasEtagFromHeaders, HasS3Fields};
|
use crate::s3::response_traits::{HasEtagFromHeaders, HasS3Fields};
|
||||||
use crate::s3::segmented_bytes::SegmentedBytes;
|
use crate::s3::segmented_bytes::SegmentedBytes;
|
||||||
use crate::s3::signer::sign_v4_s3;
|
use crate::s3::signer::{SigningKeyCache, sign_v4_s3};
|
||||||
use crate::s3::utils::{EMPTY_SHA256, check_ssec_with_log, sha256_hash_sb, to_amz_date, utc_now};
|
use crate::s3::utils::{EMPTY_SHA256, check_ssec_with_log, sha256_hash_sb, to_amz_date, utc_now};
|
||||||
|
|
||||||
mod append_object;
|
mod append_object;
|
||||||
@ -143,6 +158,103 @@ impl Iterator for BodyIterator {
|
|||||||
/// exceeds this count, each part must be larger to remain within the limit.
|
/// exceeds this count, each part must be larger to remain within the limit.
|
||||||
pub const MAX_MULTIPART_COUNT: u16 = 10_000;
|
pub const MAX_MULTIPART_COUNT: u16 = 10_000;
|
||||||
|
|
||||||
|
/// Configuration for the HTTP connection pool.
|
||||||
|
///
|
||||||
|
/// These settings allow tuning the client for different workloads:
|
||||||
|
/// - **High-throughput**: Increase `max_idle_per_host` and `idle_timeout`
|
||||||
|
/// - **Low-latency**: Enable `tcp_nodelay` (default)
|
||||||
|
/// - **Resource-constrained**: Reduce `max_idle_per_host` and `idle_timeout`
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use minio::s3::client::ConnectionPoolConfig;
|
||||||
|
/// use std::time::Duration;
|
||||||
|
///
|
||||||
|
/// // High-throughput configuration
|
||||||
|
/// let config = ConnectionPoolConfig::default()
|
||||||
|
/// .max_idle_per_host(64)
|
||||||
|
/// .idle_timeout(Duration::from_secs(120));
|
||||||
|
///
|
||||||
|
/// // Resource-constrained configuration
|
||||||
|
/// let config = ConnectionPoolConfig::default()
|
||||||
|
/// .max_idle_per_host(4)
|
||||||
|
/// .idle_timeout(Duration::from_secs(30));
|
||||||
|
/// ```
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ConnectionPoolConfig {
|
||||||
|
/// Maximum number of idle connections per host.
|
||||||
|
///
|
||||||
|
/// Higher values allow more parallel requests but consume more memory.
|
||||||
|
/// Default: 32 (optimized for parallel S3 operations)
|
||||||
|
pub max_idle_per_host: usize,
|
||||||
|
|
||||||
|
/// How long idle connections are kept in the pool.
|
||||||
|
///
|
||||||
|
/// Longer timeouts reduce reconnection overhead but increase memory usage.
|
||||||
|
/// Default: 90 seconds
|
||||||
|
pub idle_timeout: std::time::Duration,
|
||||||
|
|
||||||
|
/// TCP keepalive interval.
|
||||||
|
///
|
||||||
|
/// Helps detect dead connections and keeps connections alive through NAT/firewalls.
|
||||||
|
/// Default: 60 seconds
|
||||||
|
pub tcp_keepalive: std::time::Duration,
|
||||||
|
|
||||||
|
/// Enable TCP_NODELAY (disable Nagle's algorithm).
|
||||||
|
///
|
||||||
|
/// Reduces latency for small requests but may reduce throughput on
|
||||||
|
/// high-bandwidth, high-latency links. Default: true
|
||||||
|
pub tcp_nodelay: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for ConnectionPoolConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
max_idle_per_host: 32,
|
||||||
|
idle_timeout: std::time::Duration::from_secs(90),
|
||||||
|
tcp_keepalive: std::time::Duration::from_secs(60),
|
||||||
|
tcp_nodelay: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionPoolConfig {
|
||||||
|
/// Set the maximum number of idle connections per host.
|
||||||
|
///
|
||||||
|
/// Higher values allow more parallel requests but consume more memory.
|
||||||
|
/// Typical values: 2-8 for light usage, 16-64 for heavy parallel workloads.
|
||||||
|
pub fn max_idle_per_host(mut self, max: usize) -> Self {
|
||||||
|
self.max_idle_per_host = max;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set how long idle connections are kept in the pool.
|
||||||
|
///
|
||||||
|
/// Longer timeouts reduce reconnection overhead but increase memory usage.
|
||||||
|
pub fn idle_timeout(mut self, timeout: std::time::Duration) -> Self {
|
||||||
|
self.idle_timeout = timeout;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the TCP keepalive interval.
|
||||||
|
///
|
||||||
|
/// Helps detect dead connections and keeps connections alive through NAT/firewalls.
|
||||||
|
pub fn tcp_keepalive(mut self, interval: std::time::Duration) -> Self {
|
||||||
|
self.tcp_keepalive = interval;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Enable or disable TCP_NODELAY (Nagle's algorithm).
|
||||||
|
///
|
||||||
|
/// When enabled (default), reduces latency for small requests.
|
||||||
|
/// Disable for better throughput on high-bandwidth, high-latency links.
|
||||||
|
pub fn tcp_nodelay(mut self, enable: bool) -> Self {
|
||||||
|
self.tcp_nodelay = enable;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Client Builder manufactures a Client using given parameters.
|
/// Client Builder manufactures a Client using given parameters.
|
||||||
/// Creates a builder given a base URL for the MinIO service or other AWS S3
|
/// Creates a builder given a base URL for the MinIO service or other AWS S3
|
||||||
/// compatible object storage service.
|
/// compatible object storage service.
|
||||||
@ -158,6 +270,10 @@ pub struct MinioClientBuilder {
|
|||||||
ignore_cert_check: Option<bool>,
|
ignore_cert_check: Option<bool>,
|
||||||
/// Set the app info as an Option of (app_name, app_version) pair. This will show up in the client's user-agent.
|
/// Set the app info as an Option of (app_name, app_version) pair. This will show up in the client's user-agent.
|
||||||
app_info: Option<(String, String)>,
|
app_info: Option<(String, String)>,
|
||||||
|
/// Skip region lookup for MinIO servers (region is not used by MinIO).
|
||||||
|
skip_region_lookup: bool,
|
||||||
|
/// HTTP connection pool configuration.
|
||||||
|
connection_pool_config: ConnectionPoolConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MinioClientBuilder {
|
impl MinioClientBuilder {
|
||||||
@ -171,6 +287,8 @@ impl MinioClientBuilder {
|
|||||||
ssl_cert_file: None,
|
ssl_cert_file: None,
|
||||||
ignore_cert_check: None,
|
ignore_cert_check: None,
|
||||||
app_info: None,
|
app_info: None,
|
||||||
|
skip_region_lookup: false,
|
||||||
|
connection_pool_config: ConnectionPoolConfig::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -208,9 +326,81 @@ impl MinioClientBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Skip region lookup for MinIO servers.
|
||||||
|
///
|
||||||
|
/// MinIO does not use AWS regions, so region lookup is unnecessary overhead.
|
||||||
|
/// When enabled, the client will use the default region ("us-east-1") for
|
||||||
|
/// all requests without making network calls to determine the bucket region.
|
||||||
|
///
|
||||||
|
/// This improves performance by eliminating the first-request latency penalty
|
||||||
|
/// caused by region discovery.
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
///
|
||||||
|
/// ```no_run
|
||||||
|
/// use minio::s3::client::MinioClientBuilder;
|
||||||
|
/// use minio::s3::creds::StaticProvider;
|
||||||
|
/// use minio::s3::http::BaseUrl;
|
||||||
|
///
|
||||||
|
/// let base_url: BaseUrl = "http://localhost:9000".parse().unwrap();
|
||||||
|
/// let client = MinioClientBuilder::new(base_url)
|
||||||
|
/// .provider(Some(StaticProvider::new("minioadmin", "minioadmin", None)))
|
||||||
|
/// .skip_region_lookup(true)
|
||||||
|
/// .build()
|
||||||
|
/// .unwrap();
|
||||||
|
/// ```
|
||||||
|
pub fn skip_region_lookup(mut self, skip: bool) -> Self {
|
||||||
|
self.skip_region_lookup = skip;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Configure the HTTP connection pool settings.
|
||||||
|
///
|
||||||
|
/// Allows tuning the client for different workloads (high-throughput,
|
||||||
|
/// low-latency, or resource-constrained environments).
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
///
|
||||||
|
/// ```no_run
|
||||||
|
/// use minio::s3::client::{MinioClientBuilder, ConnectionPoolConfig};
|
||||||
|
/// use minio::s3::creds::StaticProvider;
|
||||||
|
/// use minio::s3::http::BaseUrl;
|
||||||
|
/// use std::time::Duration;
|
||||||
|
///
|
||||||
|
/// let base_url: BaseUrl = "http://localhost:9000".parse().unwrap();
|
||||||
|
///
|
||||||
|
/// // High-throughput configuration for parallel uploads
|
||||||
|
/// let client = MinioClientBuilder::new(base_url)
|
||||||
|
/// .provider(Some(StaticProvider::new("minioadmin", "minioadmin", None)))
|
||||||
|
/// .connection_pool_config(
|
||||||
|
/// ConnectionPoolConfig::default()
|
||||||
|
/// .max_idle_per_host(64)
|
||||||
|
/// .idle_timeout(Duration::from_secs(120))
|
||||||
|
/// )
|
||||||
|
/// .build()
|
||||||
|
/// .unwrap();
|
||||||
|
/// ```
|
||||||
|
pub fn connection_pool_config(mut self, config: ConnectionPoolConfig) -> Self {
|
||||||
|
self.connection_pool_config = config;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Build the Client.
|
/// Build the Client.
|
||||||
pub fn build(self) -> Result<MinioClient, Error> {
|
pub fn build(self) -> Result<MinioClient, Error> {
|
||||||
let mut builder = reqwest::Client::builder().no_gzip();
|
let pool_config = &self.connection_pool_config;
|
||||||
|
let mut builder = reqwest::Client::builder()
|
||||||
|
.no_gzip()
|
||||||
|
.tcp_nodelay(pool_config.tcp_nodelay)
|
||||||
|
.tcp_keepalive(pool_config.tcp_keepalive)
|
||||||
|
.pool_max_idle_per_host(pool_config.max_idle_per_host)
|
||||||
|
.pool_idle_timeout(pool_config.idle_timeout);
|
||||||
|
|
||||||
|
// HTTP/2 adaptive window improves throughput when server supports HTTP/2.
|
||||||
|
// Has no effect with HTTP/1.1-only servers (graceful fallback).
|
||||||
|
#[cfg(feature = "http2")]
|
||||||
|
{
|
||||||
|
builder = builder.http2_adaptive_window(true);
|
||||||
|
}
|
||||||
|
|
||||||
let mut user_agent = String::from("MinIO (")
|
let mut user_agent = String::from("MinIO (")
|
||||||
+ std::env::consts::OS
|
+ std::env::consts::OS
|
||||||
@ -257,6 +447,8 @@ impl MinioClientBuilder {
|
|||||||
client_hooks: self.client_hooks,
|
client_hooks: self.client_hooks,
|
||||||
region_map: Default::default(),
|
region_map: Default::default(),
|
||||||
express: Default::default(),
|
express: Default::default(),
|
||||||
|
skip_region_lookup: self.skip_region_lookup,
|
||||||
|
signing_key_cache: RwLock::new(SigningKeyCache::new()),
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -537,6 +729,7 @@ impl MinioClient {
|
|||||||
headers.add(X_AMZ_SECURITY_TOKEN, creds.session_token.unwrap());
|
headers.add(X_AMZ_SECURITY_TOKEN, creds.session_token.unwrap());
|
||||||
}
|
}
|
||||||
sign_v4_s3(
|
sign_v4_s3(
|
||||||
|
&self.shared.signing_key_cache,
|
||||||
method,
|
method,
|
||||||
&url.path,
|
&url.path,
|
||||||
region,
|
region,
|
||||||
@ -729,6 +922,145 @@ impl MinioClient {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fast-path GET request that bypasses the general S3 API overhead.
|
||||||
|
///
|
||||||
|
/// This method is optimized for high-performance object retrieval scenarios
|
||||||
|
/// like DataFusion/ObjectStore integration where minimal latency is critical.
|
||||||
|
///
|
||||||
|
/// Returns the raw reqwest Response for direct stream access.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
/// * `bucket` - The bucket name (validated)
|
||||||
|
/// * `object` - The object key (validated)
|
||||||
|
/// * `range` - Optional byte range as (offset, length). If length is 0 or None, reads from offset to end.
|
||||||
|
///
|
||||||
|
/// # Important Limitations
|
||||||
|
///
|
||||||
|
/// This method bypasses several standard client features for performance:
|
||||||
|
///
|
||||||
|
/// - **No hooks**: Client hooks registered via [`MinioClientBuilder::hook`] are NOT called.
|
||||||
|
/// This means custom authentication, logging, metrics, or request modification will not apply.
|
||||||
|
/// - **ALWAYS skips region lookup**: Unconditionally uses the default region ("us-east-1"),
|
||||||
|
/// **ignoring** the client's [`skip_region_lookup`](MinioClientBuilder::skip_region_lookup) setting.
|
||||||
|
/// This is correct for MinIO servers but **WILL FAIL** for AWS S3 buckets in non-default regions.
|
||||||
|
/// If your client is configured with `skip_region_lookup(false)` expecting region lookups to work,
|
||||||
|
/// this method will silently bypass that configuration and use "us-east-1" anyway.
|
||||||
|
/// - **No extra headers**: Does not add custom headers that might be configured elsewhere.
|
||||||
|
///
|
||||||
|
/// # When to Use
|
||||||
|
///
|
||||||
|
/// Use this method when:
|
||||||
|
/// - You need maximum throughput for bulk data retrieval
|
||||||
|
/// - You're integrating with systems like Apache Arrow/DataFusion
|
||||||
|
/// - You've already validated bucket/object names upstream
|
||||||
|
/// - You don't need hook functionality (logging, metrics, custom auth)
|
||||||
|
///
|
||||||
|
/// # When NOT to Use
|
||||||
|
///
|
||||||
|
/// Use the standard [`get_object`](MinioClient::get_object) API when:
|
||||||
|
/// - You need hook support for authentication, logging, or monitoring
|
||||||
|
/// - You're working with AWS S3 buckets that may be in non-default regions
|
||||||
|
/// - Your client has `skip_region_lookup(false)` and expects region lookups to work
|
||||||
|
/// - You want the full feature set of the SDK
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns an error if:
|
||||||
|
/// - Bucket name is invalid (same validation as standard API)
|
||||||
|
/// - Object name is invalid (same validation as standard API)
|
||||||
|
/// - The server returns a non-success status code
|
||||||
|
pub async fn get_object_fast(
|
||||||
|
&self,
|
||||||
|
bucket: &str,
|
||||||
|
object: &str,
|
||||||
|
range: Option<(u64, Option<u64>)>,
|
||||||
|
) -> Result<reqwest::Response, Error> {
|
||||||
|
use crate::s3::utils::{check_bucket_name, check_object_name};
|
||||||
|
|
||||||
|
// Validate inputs (same as standard API)
|
||||||
|
check_bucket_name(bucket, true)?;
|
||||||
|
check_object_name(object)?;
|
||||||
|
|
||||||
|
// Use default region (skip region lookup for performance)
|
||||||
|
let region = DEFAULT_REGION;
|
||||||
|
|
||||||
|
// Build URL directly (no query params for GET)
|
||||||
|
let url = self.shared.base_url.build_url(
|
||||||
|
&Method::GET,
|
||||||
|
region,
|
||||||
|
&Multimap::new(),
|
||||||
|
Some(bucket),
|
||||||
|
Some(object),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// Build headers in Multimap (single source of truth)
|
||||||
|
let date = utc_now();
|
||||||
|
let mut headers = Multimap::new();
|
||||||
|
headers.add(HOST, url.host_header_value());
|
||||||
|
headers.add(X_AMZ_DATE, to_amz_date(date));
|
||||||
|
headers.add(X_AMZ_CONTENT_SHA256, EMPTY_SHA256);
|
||||||
|
|
||||||
|
// Add range header if specified
|
||||||
|
if let Some((offset, length)) = range {
|
||||||
|
let range_str = match length {
|
||||||
|
Some(len) if len > 0 => format!("bytes={}-{}", offset, offset + len - 1),
|
||||||
|
_ => format!("bytes={}-", offset),
|
||||||
|
};
|
||||||
|
headers.add(RANGE, range_str);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sign the request if we have credentials
|
||||||
|
if let Some(provider) = &self.shared.provider {
|
||||||
|
let creds = provider.fetch();
|
||||||
|
if let Some(token) = &creds.session_token {
|
||||||
|
headers.add(X_AMZ_SECURITY_TOKEN, token);
|
||||||
|
}
|
||||||
|
|
||||||
|
sign_v4_s3(
|
||||||
|
&self.shared.signing_key_cache,
|
||||||
|
&Method::GET,
|
||||||
|
&url.path,
|
||||||
|
region,
|
||||||
|
&mut headers,
|
||||||
|
&Multimap::new(),
|
||||||
|
&creds.access_key,
|
||||||
|
&creds.secret_key,
|
||||||
|
EMPTY_SHA256,
|
||||||
|
date,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build reqwest request and transfer all headers
|
||||||
|
let mut req = self.http_client.get(url.to_string());
|
||||||
|
for (key, values) in headers.iter_all() {
|
||||||
|
for value in values {
|
||||||
|
req = req.header(key, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send request
|
||||||
|
let resp = req.send().await.map_err(ValidationErr::from)?;
|
||||||
|
|
||||||
|
if resp.status().is_success() {
|
||||||
|
return Ok(resp);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle error response
|
||||||
|
let status = resp.status();
|
||||||
|
Err(Error::S3Server(S3ServerError::S3Error(Box::new(
|
||||||
|
MinioErrorResponse::from_status_and_message(
|
||||||
|
status.as_u16(),
|
||||||
|
format!(
|
||||||
|
"GET object failed with status {} ({}): {}/{}",
|
||||||
|
status.as_u16(),
|
||||||
|
status.canonical_reason().unwrap_or("Unknown"),
|
||||||
|
bucket,
|
||||||
|
object
|
||||||
|
),
|
||||||
|
),
|
||||||
|
))))
|
||||||
|
}
|
||||||
|
|
||||||
/// create an example client for testing on localhost
|
/// create an example client for testing on localhost
|
||||||
#[cfg(feature = "localhost")]
|
#[cfg(feature = "localhost")]
|
||||||
pub fn create_client_on_localhost()
|
pub fn create_client_on_localhost()
|
||||||
@ -745,13 +1077,18 @@ impl MinioClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) struct SharedClientItems {
|
pub(crate) struct SharedClientItems {
|
||||||
pub(crate) base_url: BaseUrl,
|
pub(crate) base_url: BaseUrl,
|
||||||
pub(crate) provider: Option<Arc<dyn Provider + Send + Sync + 'static>>,
|
pub(crate) provider: Option<Arc<dyn Provider + Send + Sync + 'static>>,
|
||||||
client_hooks: Vec<Arc<dyn RequestHooks + Send + Sync + 'static>>,
|
client_hooks: Vec<Arc<dyn RequestHooks + Send + Sync + 'static>>,
|
||||||
region_map: DashMap<String, String>,
|
region_map: DashMap<String, String>,
|
||||||
express: OnceLock<bool>,
|
express: OnceLock<bool>,
|
||||||
|
pub(crate) skip_region_lookup: bool,
|
||||||
|
/// Cached precomputation of AWS Signature V4 signing keys.
|
||||||
|
/// Stored per-client to support multiple clients with different credentials
|
||||||
|
/// in the same process.
|
||||||
|
pub(crate) signing_key_cache: RwLock<SigningKeyCache>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SharedClientItems {
|
impl SharedClientItems {
|
||||||
|
|||||||
@ -17,7 +17,12 @@ use crate::s3::builders::{StatObject, StatObjectBldr};
|
|||||||
use crate::s3::client::MinioClient;
|
use crate::s3::client::MinioClient;
|
||||||
|
|
||||||
impl MinioClient {
|
impl MinioClient {
|
||||||
/// Creates a [`StatObject`] request builder. Given a bucket and object name, return some statistics.
|
/// Creates a [`StatObject`] request builder to retrieve object metadata.
|
||||||
|
///
|
||||||
|
/// This operation uses the HTTP HEAD method (S3 HeadObject API) to efficiently
|
||||||
|
/// retrieve object metadata without downloading the object body. This is the
|
||||||
|
/// standard and most efficient way to check if an object exists and get its
|
||||||
|
/// metadata (size, ETag, Content-Type, user metadata, etc.).
|
||||||
///
|
///
|
||||||
/// To execute the request, call [`StatObject::send()`](crate::s3::types::S3Api::send),
|
/// To execute the request, call [`StatObject::send()`](crate::s3::types::S3Api::send),
|
||||||
/// which returns a [`Result`] containing a [`StatObjectResponse`](crate::s3::response::StatObjectResponse).
|
/// which returns a [`Result`] containing a [`StatObjectResponse`](crate::s3::response::StatObjectResponse).
|
||||||
|
|||||||
@ -31,5 +31,5 @@ pub mod types;
|
|||||||
pub mod utils;
|
pub mod utils;
|
||||||
|
|
||||||
// Re-export types module contents for convenience
|
// Re-export types module contents for convenience
|
||||||
pub use client::{MinioClient, MinioClientBuilder};
|
pub use client::{ConnectionPoolConfig, MinioClient, MinioClientBuilder};
|
||||||
pub use types::{header_constants, lifecycle_config, minio_error_response, sse};
|
pub use types::{header_constants, lifecycle_config, minio_error_response, sse};
|
||||||
|
|||||||
@ -14,13 +14,38 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use crate::s3::utils::url_encode;
|
use crate::s3::utils::url_encode;
|
||||||
use lazy_static::lazy_static;
|
use std::borrow::Cow;
|
||||||
use regex::Regex;
|
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
/// Multimap for string key and string value
|
/// Multimap for string key and string value
|
||||||
pub type Multimap = multimap::MultiMap<String, String>;
|
pub type Multimap = multimap::MultiMap<String, String>;
|
||||||
|
|
||||||
|
/// Collapses multiple spaces into a single space (avoids regex overhead).
|
||||||
|
///
|
||||||
|
/// Returns `Cow::Borrowed` when no transformation is needed (common case),
|
||||||
|
/// avoiding allocation for header values that don't contain consecutive spaces.
|
||||||
|
#[inline]
|
||||||
|
fn collapse_spaces(s: &str) -> Cow<'_, str> {
|
||||||
|
let trimmed = s.trim();
|
||||||
|
if !trimmed.contains(" ") {
|
||||||
|
return Cow::Borrowed(trimmed);
|
||||||
|
}
|
||||||
|
let mut result = String::with_capacity(trimmed.len());
|
||||||
|
let mut prev_space = false;
|
||||||
|
for c in trimmed.chars() {
|
||||||
|
if c == ' ' {
|
||||||
|
if !prev_space {
|
||||||
|
result.push(' ');
|
||||||
|
prev_space = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
result.push(c);
|
||||||
|
prev_space = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Cow::Owned(result)
|
||||||
|
}
|
||||||
|
|
||||||
pub trait MultimapExt {
|
pub trait MultimapExt {
|
||||||
/// Adds a key-value pair to the multimap
|
/// Adds a key-value pair to the multimap
|
||||||
fn add<K: Into<String>, V: Into<String>>(&mut self, key: K, value: V);
|
fn add<K: Into<String>, V: Into<String>>(&mut self, key: K, value: V);
|
||||||
@ -76,60 +101,77 @@ impl MultimapExt for Multimap {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn get_canonical_query_string(&self) -> String {
|
fn get_canonical_query_string(&self) -> String {
|
||||||
let mut keys: Vec<String> = Vec::new();
|
// Use BTreeMap for automatic sorting (avoids explicit sort)
|
||||||
for (key, _) in self.iter() {
|
let mut sorted: BTreeMap<&str, Vec<&str>> = BTreeMap::new();
|
||||||
keys.push(key.to_string());
|
let mut total_len = 0usize;
|
||||||
}
|
|
||||||
keys.sort();
|
|
||||||
|
|
||||||
let mut query = String::new();
|
for (key, values) in self.iter_all() {
|
||||||
for key in keys {
|
for value in values {
|
||||||
match self.get_vec(key.as_str()) {
|
// Pre-calculate total length to avoid reallocations.
|
||||||
Some(values) => {
|
// Most S3 query params are alphanumeric (uploadId, partNumber, versionId)
|
||||||
for value in values {
|
// so we use actual length + 20% buffer for occasional URL encoding.
|
||||||
if !query.is_empty() {
|
total_len += key.len() + 1 + value.len() + 2; // key=value&
|
||||||
query.push('&');
|
}
|
||||||
}
|
sorted
|
||||||
query.push_str(&url_encode(key.as_str()));
|
.entry(key.as_str())
|
||||||
query.push('=');
|
.or_default()
|
||||||
query.push_str(&url_encode(value));
|
.extend(values.iter().map(|s| s.as_str()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add 20% buffer for URL encoding overhead
|
||||||
|
let mut query = String::with_capacity(total_len + total_len / 5);
|
||||||
|
for (key, values) in sorted {
|
||||||
|
for value in values {
|
||||||
|
if !query.is_empty() {
|
||||||
|
query.push('&');
|
||||||
}
|
}
|
||||||
None => todo!(), // This never happens.
|
query.push_str(&url_encode(key));
|
||||||
};
|
query.push('=');
|
||||||
|
query.push_str(&url_encode(value));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
query
|
query
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_canonical_headers(&self) -> (String, String) {
|
fn get_canonical_headers(&self) -> (String, String) {
|
||||||
lazy_static! {
|
// Use BTreeMap for automatic sorting (avoids explicit sort)
|
||||||
static ref MULTI_SPACE_REGEX: Regex = Regex::new("( +)").unwrap();
|
|
||||||
}
|
|
||||||
let mut btmap: BTreeMap<String, String> = BTreeMap::new();
|
let mut btmap: BTreeMap<String, String> = BTreeMap::new();
|
||||||
|
|
||||||
|
// Pre-calculate sizes for better allocation
|
||||||
|
let mut key_bytes = 0usize;
|
||||||
|
let mut value_bytes = 0usize;
|
||||||
|
|
||||||
for (k, values) in self.iter_all() {
|
for (k, values) in self.iter_all() {
|
||||||
let key = k.to_lowercase();
|
let key = k.to_lowercase();
|
||||||
if "authorization" == key || "user-agent" == key {
|
if key == "authorization" || key == "user-agent" {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut vs = values.clone();
|
// Sort values in place if needed
|
||||||
|
let mut vs: Vec<&String> = values.iter().collect();
|
||||||
vs.sort();
|
vs.sort();
|
||||||
|
|
||||||
let mut value = String::new();
|
let mut value =
|
||||||
|
String::with_capacity(vs.iter().map(|v| v.len()).sum::<usize>() + vs.len());
|
||||||
for v in vs {
|
for v in vs {
|
||||||
if !value.is_empty() {
|
if !value.is_empty() {
|
||||||
value.push(',');
|
value.push(',');
|
||||||
}
|
}
|
||||||
let s: String = MULTI_SPACE_REGEX.replace_all(&v, " ").trim().to_string();
|
value.push_str(&collapse_spaces(v));
|
||||||
value.push_str(&s);
|
|
||||||
}
|
}
|
||||||
btmap.insert(key.clone(), value.clone());
|
|
||||||
|
key_bytes += key.len();
|
||||||
|
value_bytes += value.len();
|
||||||
|
btmap.insert(key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut signed_headers = String::new();
|
// Pre-allocate output strings
|
||||||
let mut canonical_headers = String::new();
|
let header_count = btmap.len();
|
||||||
|
let mut signed_headers = String::with_capacity(key_bytes + header_count);
|
||||||
|
let mut canonical_headers =
|
||||||
|
String::with_capacity(key_bytes + value_bytes + header_count * 2);
|
||||||
|
|
||||||
let mut add_delim = false;
|
let mut add_delim = false;
|
||||||
for (key, value) in &btmap {
|
for (key, value) in &btmap {
|
||||||
if add_delim {
|
if add_delim {
|
||||||
@ -149,3 +191,103 @@ impl MultimapExt for Multimap {
|
|||||||
(signed_headers, canonical_headers)
|
(signed_headers, canonical_headers)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_collapse_spaces_no_consecutive_spaces() {
|
||||||
|
// Should return Cow::Borrowed (no allocation)
|
||||||
|
let result = collapse_spaces("hello world");
|
||||||
|
assert_eq!(result, "hello world");
|
||||||
|
assert!(matches!(result, Cow::Borrowed(_)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_collapse_spaces_with_consecutive_spaces() {
|
||||||
|
// Should return Cow::Owned with spaces collapsed
|
||||||
|
let result = collapse_spaces("hello world");
|
||||||
|
assert_eq!(result, "hello world");
|
||||||
|
assert!(matches!(result, Cow::Owned(_)));
|
||||||
|
|
||||||
|
let result = collapse_spaces("hello world");
|
||||||
|
assert_eq!(result, "hello world");
|
||||||
|
|
||||||
|
let result = collapse_spaces("a b c d");
|
||||||
|
assert_eq!(result, "a b c d");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_collapse_spaces_multiple_groups() {
|
||||||
|
let result = collapse_spaces("hello world foo bar");
|
||||||
|
assert_eq!(result, "hello world foo bar");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_collapse_spaces_leading_trailing() {
|
||||||
|
// Leading and trailing spaces should be trimmed
|
||||||
|
let result = collapse_spaces(" hello world ");
|
||||||
|
assert_eq!(result, "hello world");
|
||||||
|
assert!(matches!(result, Cow::Borrowed(_)));
|
||||||
|
|
||||||
|
let result = collapse_spaces(" hello world ");
|
||||||
|
assert_eq!(result, "hello world");
|
||||||
|
assert!(matches!(result, Cow::Owned(_)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_collapse_spaces_only_spaces() {
|
||||||
|
let result = collapse_spaces(" ");
|
||||||
|
assert_eq!(result, "");
|
||||||
|
assert!(matches!(result, Cow::Borrowed(_)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_collapse_spaces_empty_string() {
|
||||||
|
let result = collapse_spaces("");
|
||||||
|
assert_eq!(result, "");
|
||||||
|
assert!(matches!(result, Cow::Borrowed(_)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_collapse_spaces_single_space() {
|
||||||
|
let result = collapse_spaces(" ");
|
||||||
|
assert_eq!(result, "");
|
||||||
|
assert!(matches!(result, Cow::Borrowed(_)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_collapse_spaces_no_spaces() {
|
||||||
|
let result = collapse_spaces("helloworld");
|
||||||
|
assert_eq!(result, "helloworld");
|
||||||
|
assert!(matches!(result, Cow::Borrowed(_)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_collapse_spaces_tabs_not_collapsed() {
|
||||||
|
// Only spaces are collapsed, not tabs
|
||||||
|
let result = collapse_spaces("hello\t\tworld");
|
||||||
|
assert_eq!(result, "hello\t\tworld");
|
||||||
|
assert!(matches!(result, Cow::Borrowed(_)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_collapse_spaces_mixed_whitespace() {
|
||||||
|
// Tabs and spaces mixed - only consecutive spaces collapsed
|
||||||
|
let result = collapse_spaces("hello \t world");
|
||||||
|
assert_eq!(result, "hello \t world");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_collapse_spaces_realistic_header_value() {
|
||||||
|
// Realistic header value that should not need modification
|
||||||
|
let result = collapse_spaces("application/json");
|
||||||
|
assert_eq!(result, "application/json");
|
||||||
|
assert!(matches!(result, Cow::Borrowed(_)));
|
||||||
|
|
||||||
|
let result = collapse_spaces("bytes=0-1023");
|
||||||
|
assert_eq!(result, "bytes=0-1023");
|
||||||
|
assert!(matches!(result, Cow::Borrowed(_)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -23,6 +23,13 @@ use bytes::Bytes;
|
|||||||
use futures_util::TryStreamExt;
|
use futures_util::TryStreamExt;
|
||||||
use http::HeaderMap;
|
use http::HeaderMap;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
|
/// Type alias for a boxed byte stream with size, used by [`GetObjectResponse::into_boxed_stream`].
|
||||||
|
pub type BoxedByteStream = (
|
||||||
|
Pin<Box<dyn futures_util::Stream<Item = std::io::Result<Bytes>> + Send>>,
|
||||||
|
u64,
|
||||||
|
);
|
||||||
|
|
||||||
pub struct GetObjectResponse {
|
pub struct GetObjectResponse {
|
||||||
request: S3Request,
|
request: S3Request,
|
||||||
@ -47,6 +54,30 @@ impl GetObjectResponse {
|
|||||||
Ok(ObjectContent::new_from_stream(body, Some(content_length)))
|
Ok(ObjectContent::new_from_stream(body, Some(content_length)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the content as a boxed stream for direct streaming access.
|
||||||
|
///
|
||||||
|
/// This is more efficient than `content().to_stream().await` for scenarios
|
||||||
|
/// requiring minimal overhead, as it bypasses the async wrapper entirely.
|
||||||
|
/// Use this for high-throughput scenarios like DataFusion queries.
|
||||||
|
pub fn into_boxed_stream(self) -> Result<BoxedByteStream, Error> {
|
||||||
|
let content_length = self.object_size()?;
|
||||||
|
let stream = Box::pin(self.resp.bytes_stream().map_err(std::io::Error::other));
|
||||||
|
Ok((stream, content_length))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Consumes the response and returns all content as bytes.
|
||||||
|
///
|
||||||
|
/// **Memory usage**: This loads the entire object into memory. For objects
|
||||||
|
/// larger than available RAM, this may cause out-of-memory errors. For large
|
||||||
|
/// objects, use [`into_boxed_stream`](Self::into_boxed_stream) to process
|
||||||
|
/// data incrementally.
|
||||||
|
pub async fn into_bytes(self) -> Result<Bytes, Error> {
|
||||||
|
self.resp
|
||||||
|
.bytes()
|
||||||
|
.await
|
||||||
|
.map_err(|e| ValidationErr::HttpError(e).into())
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the content size (in Bytes) of the object.
|
/// Returns the content size (in Bytes) of the object.
|
||||||
pub fn object_size(&self) -> Result<u64, ValidationErr> {
|
pub fn object_size(&self) -> Result<u64, ValidationErr> {
|
||||||
self.resp
|
self.resp
|
||||||
|
|||||||
@ -1,3 +1,69 @@
|
|||||||
|
//! Response traits for accessing S3 metadata from HTTP response headers.
|
||||||
|
//!
|
||||||
|
//! This module provides a collection of traits that enable typed, ergonomic access to
|
||||||
|
//! metadata from S3 API responses. These traits extract data from HTTP headers and response
|
||||||
|
//! bodies returned by various S3 operations.
|
||||||
|
//!
|
||||||
|
//! # Design Philosophy
|
||||||
|
//!
|
||||||
|
//! Rather than exposing raw headers directly, these traits provide:
|
||||||
|
//! - **Type-safe access**: Automatic parsing and type conversion
|
||||||
|
//! - **Consistent API**: Uniform method names across different response types
|
||||||
|
//! - **Composability**: Mix and match traits based on what metadata is available
|
||||||
|
//!
|
||||||
|
//! # Metadata Sources
|
||||||
|
//!
|
||||||
|
//! Metadata is available from two primary sources:
|
||||||
|
//!
|
||||||
|
//! ## 1. HEAD Requests (Metadata Only)
|
||||||
|
//!
|
||||||
|
//! Operations like [`stat_object`](crate::s3::client::MinioClient::stat_object) use HEAD requests
|
||||||
|
//! to retrieve object metadata without downloading the object body. These responses typically
|
||||||
|
//! implement traits like:
|
||||||
|
//! - [`HasVersion`]: Object version ID (via `x-amz-version-id` header)
|
||||||
|
//! - [`HasObjectSize`]: Object size in bytes (via `x-amz-object-size` or `Content-Length` header)
|
||||||
|
//! - [`HasEtagFromHeaders`]: Object ETag/hash (via `ETag` header)
|
||||||
|
//! - [`HasChecksumHeaders`]: Object checksum values (via `x-amz-checksum-*` headers)
|
||||||
|
//! - [`HasIsDeleteMarker`]: Whether the object is a delete marker (via `x-amz-delete-marker` header)
|
||||||
|
//!
|
||||||
|
//! ## 2. GET Requests (Metadata + Body)
|
||||||
|
//!
|
||||||
|
//! Operations like [`get_object`](crate::s3::client::MinioClient::get_object) return both
|
||||||
|
//! metadata headers AND the object body. These responses can implement both header-based
|
||||||
|
//! traits (above) and body-parsing traits like:
|
||||||
|
//! - [`HasEtagFromBody`]: ETag parsed from XML response body
|
||||||
|
//!
|
||||||
|
//! # Example: StatObjectResponse
|
||||||
|
//!
|
||||||
|
//! The [`StatObjectResponse`](crate::s3::response::StatObjectResponse) demonstrates how
|
||||||
|
//! multiple traits compose together. It uses a HEAD request and provides:
|
||||||
|
//!
|
||||||
|
//! ```rust,ignore
|
||||||
|
//! impl HasBucket for StatObjectResponse {}
|
||||||
|
//! impl HasRegion for StatObjectResponse {}
|
||||||
|
//! impl HasObject for StatObjectResponse {}
|
||||||
|
//! impl HasEtagFromHeaders for StatObjectResponse {}
|
||||||
|
//! impl HasIsDeleteMarker for StatObjectResponse {}
|
||||||
|
//! impl HasChecksumHeaders for StatObjectResponse {}
|
||||||
|
//! impl HasVersion for StatObjectResponse {} // Version ID from header
|
||||||
|
//! impl HasObjectSize for StatObjectResponse {} // Size from header
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! This allows users to access metadata uniformly:
|
||||||
|
//!
|
||||||
|
//! ```rust,ignore
|
||||||
|
//! let response = client.stat_object(&args).await?;
|
||||||
|
//! let size = response.object_size(); // From HasObjectSize trait
|
||||||
|
//! let version = response.version_id(); // From HasVersion trait
|
||||||
|
//! let checksum = response.checksum_crc32c()?; // From HasChecksumHeaders trait
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! # Performance Considerations
|
||||||
|
//!
|
||||||
|
//! - **HEAD vs GET**: HEAD requests are faster when you only need metadata (no body transfer)
|
||||||
|
//! - **Header parsing**: Trait methods use `#[inline]` for zero-cost abstractions
|
||||||
|
//! - **Lazy evaluation**: Metadata is parsed on-demand, not upfront
|
||||||
|
|
||||||
use crate::s3::error::ValidationErr;
|
use crate::s3::error::ValidationErr;
|
||||||
use crate::s3::header_constants::*;
|
use crate::s3::header_constants::*;
|
||||||
use crate::s3::types::S3Request;
|
use crate::s3::types::S3Request;
|
||||||
|
|||||||
230
src/s3/signer.rs
230
src/s3/signer.rs
@ -14,6 +14,15 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
//! Signature V4 for S3 API
|
//! Signature V4 for S3 API
|
||||||
|
//!
|
||||||
|
//! Includes signing key caching for performance optimization.
|
||||||
|
//! The signing key only depends on (secret_key, date, region, service),
|
||||||
|
//! so we store the last computed key and reuse it when inputs match.
|
||||||
|
//!
|
||||||
|
//! Caching is per-client to support:
|
||||||
|
//! - Multiple clients with different credentials in the same process
|
||||||
|
//! - Credential rotation where old and new credentials are used simultaneously
|
||||||
|
//! - Multi-tenant applications
|
||||||
|
|
||||||
use crate::s3::header_constants::*;
|
use crate::s3::header_constants::*;
|
||||||
use crate::s3::multimap_ext::{Multimap, MultimapExt};
|
use crate::s3::multimap_ext::{Multimap, MultimapExt};
|
||||||
@ -25,6 +34,97 @@ use hyper::http::Method;
|
|||||||
use ring::hmac;
|
use ring::hmac;
|
||||||
#[cfg(not(feature = "ring"))]
|
#[cfg(not(feature = "ring"))]
|
||||||
use sha2::Sha256;
|
use sha2::Sha256;
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
|
||||||
|
/// Cached precomputation of AWS Signature V4 signing keys.
|
||||||
|
///
|
||||||
|
/// Computing a signing key requires 4 HMAC-SHA256 operations. Since the key only
|
||||||
|
/// changes when date, region, or service changes, we cache the result to avoid
|
||||||
|
/// redundant computation on subsequent requests.
|
||||||
|
///
|
||||||
|
/// This is stored per-client (in `SharedClientItems`) rather than globally to
|
||||||
|
/// support multiple clients with different credentials in the same process.
|
||||||
|
///
|
||||||
|
/// # Validation
|
||||||
|
///
|
||||||
|
/// **What we validate:**
|
||||||
|
/// - Date (YYYYMMDD): Changes daily, always validated
|
||||||
|
/// - Region: Changes per bucket, always validated
|
||||||
|
/// - Service: Always "s3", validated for correctness
|
||||||
|
///
|
||||||
|
/// **What we DON'T validate:**
|
||||||
|
/// - Secret key: Deliberately omitted for security and performance
|
||||||
|
///
|
||||||
|
/// **Why not validate secret key?**
|
||||||
|
///
|
||||||
|
/// 1. **Security**: Storing the secret key (even hashed) increases memory exposure risk
|
||||||
|
/// 2. **Performance**: Hashing the secret key on every cache check adds overhead
|
||||||
|
/// 3. **Acceptable tradeoff**: Credential rotation is rare; the caller can handle
|
||||||
|
/// authentication errors by creating a new client with updated credentials
|
||||||
|
///
|
||||||
|
/// # Concurrency
|
||||||
|
///
|
||||||
|
/// Uses RwLock to allow concurrent reads while only blocking for writes.
|
||||||
|
/// Uses Arc for zero-copy sharing of the signing key across threads.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub(crate) struct SigningKeyCache {
|
||||||
|
/// The cached signing key (Arc allows zero-copy sharing on cache hits)
|
||||||
|
key: Arc<[u8]>,
|
||||||
|
/// The date string (YYYYMMDD) this key was computed for
|
||||||
|
date_str: String,
|
||||||
|
/// The region this key was computed for
|
||||||
|
region: String,
|
||||||
|
/// The service name this key was computed for
|
||||||
|
service: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for SigningKeyCache {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SigningKeyCache {
|
||||||
|
pub(crate) fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
key: Arc::from(Vec::new()),
|
||||||
|
date_str: String::new(),
|
||||||
|
region: String::new(),
|
||||||
|
service: String::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Checks if the cached signing key is valid for the given parameters.
|
||||||
|
///
|
||||||
|
/// Note: Does NOT validate the secret key. See struct-level documentation
|
||||||
|
/// for the rationale behind this design decision.
|
||||||
|
#[inline]
|
||||||
|
fn matches(&self, date_str: &str, region: &str, service: &str) -> bool {
|
||||||
|
// Check most likely to change first (date changes daily)
|
||||||
|
self.date_str == date_str && self.region == region && self.service == service
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the cached signing key if it matches the given parameters.
|
||||||
|
///
|
||||||
|
/// Returns `None` if the cache is invalid (different date/region/service).
|
||||||
|
/// Uses Arc::clone for zero-copy sharing (just atomic reference count increment).
|
||||||
|
#[inline]
|
||||||
|
fn get_key_if_matches(&self, date_str: &str, region: &str, service: &str) -> Option<Arc<[u8]>> {
|
||||||
|
if self.matches(date_str, region, service) {
|
||||||
|
Some(Arc::clone(&self.key))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Updates the cache with a new signing key and associated parameters.
|
||||||
|
fn update(&mut self, key: Arc<[u8]>, date_str: String, region: String, service: String) {
|
||||||
|
self.key = key;
|
||||||
|
self.date_str = date_str;
|
||||||
|
self.region = region;
|
||||||
|
self.service = service;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns HMAC hash for given key and data.
|
/// Returns HMAC hash for given key and data.
|
||||||
fn hmac_hash(key: &[u8], data: &[u8]) -> Vec<u8> {
|
fn hmac_hash(key: &[u8], data: &[u8]) -> Vec<u8> {
|
||||||
@ -78,17 +178,87 @@ fn get_string_to_sign(date: UtcTime, scope: &str, canonical_request_hash: &str)
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns signing key of given secret key, date, region and service name.
|
/// Computes the signing key (uncached) for given secret key, date, region and service name.
|
||||||
fn get_signing_key(secret_key: &str, date: UtcTime, region: &str, service_name: &str) -> Vec<u8> {
|
fn compute_signing_key(
|
||||||
|
secret_key: &str,
|
||||||
|
date_str: &str,
|
||||||
|
region: &str,
|
||||||
|
service_name: &str,
|
||||||
|
) -> Vec<u8> {
|
||||||
let mut key: Vec<u8> = b"AWS4".to_vec();
|
let mut key: Vec<u8> = b"AWS4".to_vec();
|
||||||
key.extend(secret_key.as_bytes());
|
key.extend(secret_key.as_bytes());
|
||||||
|
|
||||||
let date_key = hmac_hash(key.as_slice(), to_signer_date(date).as_bytes());
|
let date_key = hmac_hash(key.as_slice(), date_str.as_bytes());
|
||||||
let date_region_key = hmac_hash(date_key.as_slice(), region.as_bytes());
|
let date_region_key = hmac_hash(date_key.as_slice(), region.as_bytes());
|
||||||
let date_region_service_key = hmac_hash(date_region_key.as_slice(), service_name.as_bytes());
|
let date_region_service_key = hmac_hash(date_region_key.as_slice(), service_name.as_bytes());
|
||||||
hmac_hash(date_region_service_key.as_slice(), b"aws4_request")
|
hmac_hash(date_region_service_key.as_slice(), b"aws4_request")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns signing key of given secret key, date, region and service name.
|
||||||
|
///
|
||||||
|
/// Uses caching to avoid recomputing the signing key for every request.
|
||||||
|
/// The signing key only changes when the date (YYYYMMDD), region, or service changes,
|
||||||
|
/// so we store the last computed key and reuse it when inputs match.
|
||||||
|
///
|
||||||
|
/// # Performance
|
||||||
|
///
|
||||||
|
/// **Cache hits (common case after first request of the day per region):**
|
||||||
|
/// - Returns cached key via Arc::clone (atomic reference count increment)
|
||||||
|
/// - Multiple threads can read simultaneously via RwLock
|
||||||
|
///
|
||||||
|
/// **Cache misses (daily date change or region change):**
|
||||||
|
/// - Computes new signing key (4 HMAC-SHA256 operations)
|
||||||
|
/// - Computation happens outside the lock to avoid blocking readers
|
||||||
|
/// - Brief write lock to update cache with new key
|
||||||
|
///
|
||||||
|
/// # Credential Rotation
|
||||||
|
///
|
||||||
|
/// The cache does not validate credentials - it only checks date/region/service.
|
||||||
|
/// If credentials rotate while date/region/service remain the same, the cached
|
||||||
|
/// signing key (derived from old credentials) will be used, causing S3 to return
|
||||||
|
/// an authentication error. The caller is responsible for handling credential
|
||||||
|
/// rotation at a higher level.
|
||||||
|
fn get_signing_key(
|
||||||
|
cache: &RwLock<SigningKeyCache>,
|
||||||
|
secret_key: &str,
|
||||||
|
date: UtcTime,
|
||||||
|
region: &str,
|
||||||
|
service_name: &str,
|
||||||
|
) -> Arc<[u8]> {
|
||||||
|
let date_str = to_signer_date(date);
|
||||||
|
|
||||||
|
// Fast path: try to get from cache with read lock (allows concurrent reads)
|
||||||
|
// Zero allocations on cache hit - just Arc::clone (atomic increment)
|
||||||
|
if let Ok(cache_guard) = cache.read()
|
||||||
|
&& let Some(key) = cache_guard.get_key_if_matches(&date_str, region, service_name)
|
||||||
|
{
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cache miss - compute the signing key outside the lock (4 HMAC operations)
|
||||||
|
// Multiple threads may compute simultaneously on cache miss, but that's acceptable
|
||||||
|
// since HMAC is deterministic and the brief redundant computation is better than
|
||||||
|
// blocking all threads during the expensive operation.
|
||||||
|
let signing_key = Arc::from(compute_signing_key(
|
||||||
|
secret_key,
|
||||||
|
&date_str,
|
||||||
|
region,
|
||||||
|
service_name,
|
||||||
|
));
|
||||||
|
|
||||||
|
// Update cache with write lock (brief, just updating Arc references)
|
||||||
|
if let Ok(mut cache_guard) = cache.write() {
|
||||||
|
cache_guard.update(
|
||||||
|
Arc::clone(&signing_key),
|
||||||
|
date_str,
|
||||||
|
region.to_string(),
|
||||||
|
service_name.to_string(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
signing_key
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns signature value for given signing key and string-to-sign.
|
/// Returns signature value for given signing key and string-to-sign.
|
||||||
fn get_signature(signing_key: &[u8], string_to_sign: &[u8]) -> String {
|
fn get_signature(signing_key: &[u8], string_to_sign: &[u8]) -> String {
|
||||||
hmac_hash_hex(signing_key, string_to_sign)
|
hmac_hash_hex(signing_key, string_to_sign)
|
||||||
@ -108,6 +278,7 @@ fn get_authorization(
|
|||||||
|
|
||||||
/// Signs and updates headers for given parameters.
|
/// Signs and updates headers for given parameters.
|
||||||
fn sign_v4(
|
fn sign_v4(
|
||||||
|
cache: &RwLock<SigningKeyCache>,
|
||||||
service_name: &str,
|
service_name: &str,
|
||||||
method: &Method,
|
method: &Method,
|
||||||
uri: &str,
|
uri: &str,
|
||||||
@ -131,15 +302,18 @@ fn sign_v4(
|
|||||||
content_sha256,
|
content_sha256,
|
||||||
);
|
);
|
||||||
let string_to_sign = get_string_to_sign(date, &scope, &canonical_request_hash);
|
let string_to_sign = get_string_to_sign(date, &scope, &canonical_request_hash);
|
||||||
let signing_key = get_signing_key(secret_key, date, region, service_name);
|
let signing_key = get_signing_key(cache, secret_key, date, region, service_name);
|
||||||
let signature = get_signature(signing_key.as_slice(), string_to_sign.as_bytes());
|
let signature = get_signature(&signing_key, string_to_sign.as_bytes());
|
||||||
let authorization = get_authorization(access_key, &scope, &signed_headers, &signature);
|
let authorization = get_authorization(access_key, &scope, &signed_headers, &signature);
|
||||||
|
|
||||||
headers.add(AUTHORIZATION, authorization);
|
headers.add(AUTHORIZATION, authorization);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Signs and updates headers for the given S3 request parameters.
|
/// Signs and updates headers for the given S3 request parameters.
|
||||||
|
///
|
||||||
|
/// The `cache` parameter should be the per-client `signing_key_cache` from `SharedClientItems`.
|
||||||
pub(crate) fn sign_v4_s3(
|
pub(crate) fn sign_v4_s3(
|
||||||
|
cache: &RwLock<SigningKeyCache>,
|
||||||
method: &Method,
|
method: &Method,
|
||||||
uri: &str,
|
uri: &str,
|
||||||
region: &str,
|
region: &str,
|
||||||
@ -151,6 +325,7 @@ pub(crate) fn sign_v4_s3(
|
|||||||
date: UtcTime,
|
date: UtcTime,
|
||||||
) {
|
) {
|
||||||
sign_v4(
|
sign_v4(
|
||||||
|
cache,
|
||||||
"s3",
|
"s3",
|
||||||
method,
|
method,
|
||||||
uri,
|
uri,
|
||||||
@ -165,7 +340,10 @@ pub(crate) fn sign_v4_s3(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Signs and updates query parameters for the given presigned request.
|
/// Signs and updates query parameters for the given presigned request.
|
||||||
|
///
|
||||||
|
/// The `cache` parameter should be the per-client `signing_key_cache` from `SharedClientItems`.
|
||||||
pub(crate) fn presign_v4(
|
pub(crate) fn presign_v4(
|
||||||
|
cache: &RwLock<SigningKeyCache>,
|
||||||
method: &Method,
|
method: &Method,
|
||||||
host: &str,
|
host: &str,
|
||||||
uri: &str,
|
uri: &str,
|
||||||
@ -196,21 +374,24 @@ pub(crate) fn presign_v4(
|
|||||||
"UNSIGNED-PAYLOAD",
|
"UNSIGNED-PAYLOAD",
|
||||||
);
|
);
|
||||||
let string_to_sign = get_string_to_sign(date, &scope, &canonical_request_hash);
|
let string_to_sign = get_string_to_sign(date, &scope, &canonical_request_hash);
|
||||||
let signing_key = get_signing_key(secret_key, date, region, "s3");
|
let signing_key = get_signing_key(cache, secret_key, date, region, "s3");
|
||||||
let signature = get_signature(signing_key.as_slice(), string_to_sign.as_bytes());
|
let signature = get_signature(&signing_key, string_to_sign.as_bytes());
|
||||||
|
|
||||||
query_params.add(X_AMZ_SIGNATURE, signature);
|
query_params.add(X_AMZ_SIGNATURE, signature);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns signature for the given presigned POST request parameters.
|
/// Returns signature for the given presigned POST request parameters.
|
||||||
|
///
|
||||||
|
/// The `cache` parameter should be the per-client `signing_key_cache` from `SharedClientItems`.
|
||||||
pub(crate) fn post_presign_v4(
|
pub(crate) fn post_presign_v4(
|
||||||
|
cache: &RwLock<SigningKeyCache>,
|
||||||
string_to_sign: &str,
|
string_to_sign: &str,
|
||||||
secret_key: &str,
|
secret_key: &str,
|
||||||
date: UtcTime,
|
date: UtcTime,
|
||||||
region: &str,
|
region: &str,
|
||||||
) -> String {
|
) -> String {
|
||||||
let signing_key = get_signing_key(secret_key, date, region, "s3");
|
let signing_key = get_signing_key(cache, secret_key, date, region, "s3");
|
||||||
get_signature(signing_key.as_slice(), string_to_sign.as_bytes())
|
get_signature(&signing_key, string_to_sign.as_bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -226,12 +407,18 @@ mod tests {
|
|||||||
Utc.with_ymd_and_hms(2013, 5, 24, 0, 0, 0).unwrap()
|
Utc.with_ymd_and_hms(2013, 5, 24, 0, 0, 0).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create a test cache for unit tests
|
||||||
|
fn test_cache() -> RwLock<SigningKeyCache> {
|
||||||
|
RwLock::new(SigningKeyCache::new())
|
||||||
|
}
|
||||||
|
|
||||||
// ===========================
|
// ===========================
|
||||||
// sign_v4_s3 Tests (Public API)
|
// sign_v4_s3 Tests (Public API)
|
||||||
// ===========================
|
// ===========================
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sign_v4_s3_adds_authorization_header() {
|
fn test_sign_v4_s3_adds_authorization_header() {
|
||||||
|
let cache = test_cache();
|
||||||
let method = Method::GET;
|
let method = Method::GET;
|
||||||
let uri = "/bucket/key";
|
let uri = "/bucket/key";
|
||||||
let region = "us-east-1";
|
let region = "us-east-1";
|
||||||
@ -249,6 +436,7 @@ mod tests {
|
|||||||
let query_params = Multimap::new();
|
let query_params = Multimap::new();
|
||||||
|
|
||||||
sign_v4_s3(
|
sign_v4_s3(
|
||||||
|
&cache,
|
||||||
&method,
|
&method,
|
||||||
uri,
|
uri,
|
||||||
region,
|
region,
|
||||||
@ -270,6 +458,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sign_v4_s3_deterministic() {
|
fn test_sign_v4_s3_deterministic() {
|
||||||
|
let cache = test_cache();
|
||||||
let method = Method::GET;
|
let method = Method::GET;
|
||||||
let uri = "/test";
|
let uri = "/test";
|
||||||
let region = "us-east-1";
|
let region = "us-east-1";
|
||||||
@ -290,6 +479,7 @@ mod tests {
|
|||||||
headers2.add(X_AMZ_DATE, "20130524T000000Z");
|
headers2.add(X_AMZ_DATE, "20130524T000000Z");
|
||||||
|
|
||||||
sign_v4_s3(
|
sign_v4_s3(
|
||||||
|
&cache,
|
||||||
&method,
|
&method,
|
||||||
uri,
|
uri,
|
||||||
region,
|
region,
|
||||||
@ -302,6 +492,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
sign_v4_s3(
|
sign_v4_s3(
|
||||||
|
&cache,
|
||||||
&method,
|
&method,
|
||||||
uri,
|
uri,
|
||||||
region,
|
region,
|
||||||
@ -319,6 +510,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sign_v4_s3_different_methods() {
|
fn test_sign_v4_s3_different_methods() {
|
||||||
|
let cache = test_cache();
|
||||||
let region = "us-east-1";
|
let region = "us-east-1";
|
||||||
let uri = "/test";
|
let uri = "/test";
|
||||||
let access_key = "test";
|
let access_key = "test";
|
||||||
@ -338,6 +530,7 @@ mod tests {
|
|||||||
headers_put.add(X_AMZ_DATE, "20130524T000000Z");
|
headers_put.add(X_AMZ_DATE, "20130524T000000Z");
|
||||||
|
|
||||||
sign_v4_s3(
|
sign_v4_s3(
|
||||||
|
&cache,
|
||||||
&Method::GET,
|
&Method::GET,
|
||||||
uri,
|
uri,
|
||||||
region,
|
region,
|
||||||
@ -350,6 +543,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
sign_v4_s3(
|
sign_v4_s3(
|
||||||
|
&cache,
|
||||||
&Method::PUT,
|
&Method::PUT,
|
||||||
uri,
|
uri,
|
||||||
region,
|
region,
|
||||||
@ -370,6 +564,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sign_v4_s3_with_special_characters() {
|
fn test_sign_v4_s3_with_special_characters() {
|
||||||
|
let cache = test_cache();
|
||||||
let method = Method::GET;
|
let method = Method::GET;
|
||||||
let uri = "/bucket/my file.txt"; // Space in filename
|
let uri = "/bucket/my file.txt"; // Space in filename
|
||||||
let region = "us-east-1";
|
let region = "us-east-1";
|
||||||
@ -387,6 +582,7 @@ mod tests {
|
|||||||
|
|
||||||
// Should not panic
|
// Should not panic
|
||||||
sign_v4_s3(
|
sign_v4_s3(
|
||||||
|
&cache,
|
||||||
&method,
|
&method,
|
||||||
uri,
|
uri,
|
||||||
region,
|
region,
|
||||||
@ -407,6 +603,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_presign_v4_adds_query_params() {
|
fn test_presign_v4_adds_query_params() {
|
||||||
|
let cache = test_cache();
|
||||||
let method = Method::GET;
|
let method = Method::GET;
|
||||||
let host = "s3.amazonaws.com";
|
let host = "s3.amazonaws.com";
|
||||||
let uri = "/bucket/key";
|
let uri = "/bucket/key";
|
||||||
@ -418,6 +615,7 @@ mod tests {
|
|||||||
let expires = 3600;
|
let expires = 3600;
|
||||||
|
|
||||||
presign_v4(
|
presign_v4(
|
||||||
|
&cache,
|
||||||
&method,
|
&method,
|
||||||
host,
|
host,
|
||||||
uri,
|
uri,
|
||||||
@ -440,6 +638,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_presign_v4_algorithm_value() {
|
fn test_presign_v4_algorithm_value() {
|
||||||
|
let cache = test_cache();
|
||||||
let method = Method::GET;
|
let method = Method::GET;
|
||||||
let host = "s3.amazonaws.com";
|
let host = "s3.amazonaws.com";
|
||||||
let uri = "/test";
|
let uri = "/test";
|
||||||
@ -451,6 +650,7 @@ mod tests {
|
|||||||
let expires = 3600;
|
let expires = 3600;
|
||||||
|
|
||||||
presign_v4(
|
presign_v4(
|
||||||
|
&cache,
|
||||||
&method,
|
&method,
|
||||||
host,
|
host,
|
||||||
uri,
|
uri,
|
||||||
@ -468,6 +668,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_presign_v4_expires_value() {
|
fn test_presign_v4_expires_value() {
|
||||||
|
let cache = test_cache();
|
||||||
let method = Method::GET;
|
let method = Method::GET;
|
||||||
let host = "s3.amazonaws.com";
|
let host = "s3.amazonaws.com";
|
||||||
let uri = "/test";
|
let uri = "/test";
|
||||||
@ -479,6 +680,7 @@ mod tests {
|
|||||||
let expires = 7200;
|
let expires = 7200;
|
||||||
|
|
||||||
presign_v4(
|
presign_v4(
|
||||||
|
&cache,
|
||||||
&method,
|
&method,
|
||||||
host,
|
host,
|
||||||
uri,
|
uri,
|
||||||
@ -496,6 +698,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_presign_v4_credential_format() {
|
fn test_presign_v4_credential_format() {
|
||||||
|
let cache = test_cache();
|
||||||
let method = Method::GET;
|
let method = Method::GET;
|
||||||
let host = "s3.amazonaws.com";
|
let host = "s3.amazonaws.com";
|
||||||
let uri = "/test";
|
let uri = "/test";
|
||||||
@ -507,6 +710,7 @@ mod tests {
|
|||||||
let expires = 3600;
|
let expires = 3600;
|
||||||
|
|
||||||
presign_v4(
|
presign_v4(
|
||||||
|
&cache,
|
||||||
&method,
|
&method,
|
||||||
host,
|
host,
|
||||||
uri,
|
uri,
|
||||||
@ -532,12 +736,13 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_post_presign_v4() {
|
fn test_post_presign_v4() {
|
||||||
|
let cache = test_cache();
|
||||||
let string_to_sign = "test_string_to_sign";
|
let string_to_sign = "test_string_to_sign";
|
||||||
let secret_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY";
|
let secret_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY";
|
||||||
let date = get_test_date();
|
let date = get_test_date();
|
||||||
let region = "us-east-1";
|
let region = "us-east-1";
|
||||||
|
|
||||||
let signature = post_presign_v4(string_to_sign, secret_key, date, region);
|
let signature = post_presign_v4(&cache, string_to_sign, secret_key, date, region);
|
||||||
|
|
||||||
// Should produce 64 character hex signature
|
// Should produce 64 character hex signature
|
||||||
assert_eq!(signature.len(), 64);
|
assert_eq!(signature.len(), 64);
|
||||||
@ -546,13 +751,14 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_post_presign_v4_deterministic() {
|
fn test_post_presign_v4_deterministic() {
|
||||||
|
let cache = test_cache();
|
||||||
let string_to_sign = "test_string";
|
let string_to_sign = "test_string";
|
||||||
let secret_key = "test_secret";
|
let secret_key = "test_secret";
|
||||||
let date = get_test_date();
|
let date = get_test_date();
|
||||||
let region = "us-east-1";
|
let region = "us-east-1";
|
||||||
|
|
||||||
let sig1 = post_presign_v4(string_to_sign, secret_key, date, region);
|
let sig1 = post_presign_v4(&cache, string_to_sign, secret_key, date, region);
|
||||||
let sig2 = post_presign_v4(string_to_sign, secret_key, date, region);
|
let sig2 = post_presign_v4(&cache, string_to_sign, secret_key, date, region);
|
||||||
|
|
||||||
assert_eq!(sig1, sig2);
|
assert_eq!(sig1, sig2);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -254,6 +254,30 @@ impl MinioErrorResponse {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a minimal error response from status code and message.
|
||||||
|
///
|
||||||
|
/// Used for fast-path operations where full error details aren't available.
|
||||||
|
pub fn from_status_and_message(status_code: u16, message: String) -> Self {
|
||||||
|
let code = match status_code {
|
||||||
|
404 => MinioErrorCode::NoSuchKey,
|
||||||
|
403 => MinioErrorCode::AccessDenied,
|
||||||
|
401 => MinioErrorCode::AccessDenied,
|
||||||
|
400 => MinioErrorCode::BadRequest,
|
||||||
|
409 => MinioErrorCode::ResourceConflict,
|
||||||
|
_ => MinioErrorCode::OtherError(format!("HTTP {}", status_code)),
|
||||||
|
};
|
||||||
|
Self {
|
||||||
|
headers: HeaderMap::new(),
|
||||||
|
code,
|
||||||
|
message: Some(message),
|
||||||
|
resource: String::new(),
|
||||||
|
request_id: String::new(),
|
||||||
|
host_id: String::new(),
|
||||||
|
bucket_name: None,
|
||||||
|
object_name: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn new_from_body(body: Bytes, headers: HeaderMap) -> Result<Self, Error> {
|
pub fn new_from_body(body: Bytes, headers: HeaderMap) -> Result<Self, Error> {
|
||||||
let root = Element::parse(body.reader()).map_err(ValidationErr::from)?;
|
let root = Element::parse(body.reader()).map_err(ValidationErr::from)?;
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
|
|||||||
@ -941,7 +941,10 @@ pub fn get_text_option(element: &Element, tag: &str) -> Option<String> {
|
|||||||
.and_then(|v| v.get_text().map(|s| s.to_string()))
|
.and_then(|v| v.get_text().map(|s| s.to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Trims leading and trailing quotes from a string. Note: consumes the input string.
|
/// Trims leading and trailing quotes from a string.
|
||||||
|
///
|
||||||
|
/// Takes ownership of and potentially modifies the input string in place
|
||||||
|
/// (via `drain` and `pop`). The original string is not preserved.
|
||||||
pub fn trim_quotes(mut s: String) -> String {
|
pub fn trim_quotes(mut s: String) -> String {
|
||||||
if s.len() >= 2 && s.starts_with('"') && s.ends_with('"') {
|
if s.len() >= 2 && s.starts_with('"') && s.ends_with('"') {
|
||||||
s.drain(0..1); // remove the leading quote
|
s.drain(0..1); // remove the leading quote
|
||||||
|
|||||||
191
tests/s3/client_config.rs
Normal file
191
tests/s3/client_config.rs
Normal file
@ -0,0 +1,191 @@
|
|||||||
|
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
|
||||||
|
// Copyright 2025 MinIO, Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
//! Tests for client configuration options like skip_region_lookup.
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
use minio::s3::MinioClient;
|
||||||
|
use minio::s3::client::MinioClientBuilder;
|
||||||
|
use minio::s3::creds::StaticProvider;
|
||||||
|
use minio::s3::response::{GetObjectResponse, PutObjectContentResponse};
|
||||||
|
use minio::s3::response_traits::{HasBucket, HasObject};
|
||||||
|
use minio::s3::types::{S3Api, ToStream};
|
||||||
|
use minio_common::test_context::TestContext;
|
||||||
|
use minio_common::utils::rand_object_name;
|
||||||
|
|
||||||
|
/// Helper to create a client with skip_region_lookup enabled.
|
||||||
|
fn create_client_with_skip_region_lookup(ctx: &TestContext) -> MinioClient {
|
||||||
|
let mut builder = MinioClientBuilder::new(ctx.base_url.clone())
|
||||||
|
.provider(Some(StaticProvider::new(
|
||||||
|
&ctx.access_key,
|
||||||
|
&ctx.secret_key,
|
||||||
|
None,
|
||||||
|
)))
|
||||||
|
.skip_region_lookup(true);
|
||||||
|
|
||||||
|
if let Some(ignore_cert) = ctx.ignore_cert_check {
|
||||||
|
builder = builder.ignore_cert_check(Some(ignore_cert));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(ref ssl_cert_file) = ctx.ssl_cert_file {
|
||||||
|
builder = builder.ssl_cert_file(Some(ssl_cert_file));
|
||||||
|
}
|
||||||
|
|
||||||
|
builder.build().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test that skip_region_lookup allows basic put/get operations.
|
||||||
|
/// This verifies operations work correctly when region lookup is skipped.
|
||||||
|
#[minio_macros::test]
|
||||||
|
async fn skip_region_lookup_put_get_object(ctx: TestContext, bucket_name: String) {
|
||||||
|
let client = create_client_with_skip_region_lookup(&ctx);
|
||||||
|
let object_name = rand_object_name();
|
||||||
|
let data: Bytes = Bytes::from("test data with skip_region_lookup");
|
||||||
|
|
||||||
|
// Put object using client with skip_region_lookup
|
||||||
|
let put_resp: PutObjectContentResponse = client
|
||||||
|
.put_object_content(&bucket_name, &object_name, data.clone())
|
||||||
|
.build()
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(put_resp.bucket(), bucket_name);
|
||||||
|
assert_eq!(put_resp.object(), object_name);
|
||||||
|
|
||||||
|
// Get object using the same client
|
||||||
|
let get_resp: GetObjectResponse = client
|
||||||
|
.get_object(&bucket_name, &object_name)
|
||||||
|
.build()
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(get_resp.bucket(), bucket_name);
|
||||||
|
assert_eq!(get_resp.object(), object_name);
|
||||||
|
|
||||||
|
let got = get_resp.into_bytes().await.unwrap();
|
||||||
|
assert_eq!(got, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test that skip_region_lookup works for bucket operations.
|
||||||
|
#[minio_macros::test]
|
||||||
|
async fn skip_region_lookup_bucket_exists(ctx: TestContext, bucket_name: String) {
|
||||||
|
let client = create_client_with_skip_region_lookup(&ctx);
|
||||||
|
|
||||||
|
// Check bucket exists using client with skip_region_lookup
|
||||||
|
let exists = client
|
||||||
|
.bucket_exists(&bucket_name)
|
||||||
|
.build()
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.exists();
|
||||||
|
|
||||||
|
assert!(exists, "Bucket should exist");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test that skip_region_lookup works for list operations.
|
||||||
|
#[minio_macros::test]
|
||||||
|
async fn skip_region_lookup_list_objects(ctx: TestContext, bucket_name: String) {
|
||||||
|
let client = create_client_with_skip_region_lookup(&ctx);
|
||||||
|
|
||||||
|
// List objects using client with skip_region_lookup
|
||||||
|
// Just verify the operation completes without error
|
||||||
|
let mut stream = client.list_objects(&bucket_name).build().to_stream().await;
|
||||||
|
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
// Consume the stream - may be empty, but should not error
|
||||||
|
let mut count = 0;
|
||||||
|
while let Some(result) = stream.next().await {
|
||||||
|
// Just verify we can read items without error
|
||||||
|
let _item = result.unwrap();
|
||||||
|
count += 1;
|
||||||
|
// Don't iterate forever
|
||||||
|
if count > 100 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test passes if we get here without error
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test that multiple operations work in sequence with skip_region_lookup.
|
||||||
|
/// This verifies that the default region is consistently used.
|
||||||
|
#[minio_macros::test]
|
||||||
|
async fn skip_region_lookup_multiple_operations(ctx: TestContext, bucket_name: String) {
|
||||||
|
let client = create_client_with_skip_region_lookup(&ctx);
|
||||||
|
|
||||||
|
// Perform multiple operations to ensure consistent behavior
|
||||||
|
for i in 0..3 {
|
||||||
|
let object_name = format!("test-object-{}", i);
|
||||||
|
let data: Bytes = Bytes::from(format!("data for object {}", i));
|
||||||
|
|
||||||
|
// Put
|
||||||
|
client
|
||||||
|
.put_object_content(&bucket_name, &object_name, data.clone())
|
||||||
|
.build()
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Get
|
||||||
|
let resp: GetObjectResponse = client
|
||||||
|
.get_object(&bucket_name, &object_name)
|
||||||
|
.build()
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let got = resp.into_bytes().await.unwrap();
|
||||||
|
assert_eq!(got, data);
|
||||||
|
|
||||||
|
// Delete
|
||||||
|
client
|
||||||
|
.delete_object(&bucket_name, &object_name)
|
||||||
|
.build()
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test that skip_region_lookup does not affect stat_object operations.
|
||||||
|
#[minio_macros::test]
|
||||||
|
async fn skip_region_lookup_stat_object(ctx: TestContext, bucket_name: String) {
|
||||||
|
let client = create_client_with_skip_region_lookup(&ctx);
|
||||||
|
let object_name = rand_object_name();
|
||||||
|
let data: Bytes = Bytes::from("test data for stat");
|
||||||
|
|
||||||
|
// Put object
|
||||||
|
client
|
||||||
|
.put_object_content(&bucket_name, &object_name, data.clone())
|
||||||
|
.build()
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Stat object using client with skip_region_lookup
|
||||||
|
let stat_resp = client
|
||||||
|
.stat_object(&bucket_name, &object_name)
|
||||||
|
.build()
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(stat_resp.bucket(), bucket_name);
|
||||||
|
assert_eq!(stat_resp.object(), object_name);
|
||||||
|
assert_eq!(stat_resp.size().unwrap(), data.len() as u64);
|
||||||
|
}
|
||||||
@ -14,6 +14,7 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
use futures_util::TryStreamExt;
|
||||||
use minio::s3::response::{GetObjectResponse, PutObjectContentResponse};
|
use minio::s3::response::{GetObjectResponse, PutObjectContentResponse};
|
||||||
use minio::s3::response_traits::{HasBucket, HasObject};
|
use minio::s3::response_traits::{HasBucket, HasObject};
|
||||||
use minio::s3::types::S3Api;
|
use minio::s3::types::S3Api;
|
||||||
@ -65,3 +66,129 @@ async fn get_object_1(ctx: TestContext, bucket_name: String) {
|
|||||||
async fn get_object_2(ctx: TestContext, bucket_name: String) {
|
async fn get_object_2(ctx: TestContext, bucket_name: String) {
|
||||||
test_get_object(&ctx, &bucket_name, "a b+c").await;
|
test_get_object(&ctx, &bucket_name, "a b+c").await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Test into_bytes method for direct byte retrieval.
|
||||||
|
#[minio_macros::test]
|
||||||
|
async fn get_object_into_bytes(ctx: TestContext, bucket_name: String) {
|
||||||
|
let object_name = rand_object_name_utf8(20);
|
||||||
|
let data: Bytes = Bytes::from("test data for into_bytes method");
|
||||||
|
|
||||||
|
// Upload test object
|
||||||
|
ctx.client
|
||||||
|
.put_object_content(&bucket_name, &object_name, data.clone())
|
||||||
|
.build()
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Retrieve using into_bytes
|
||||||
|
let resp: GetObjectResponse = ctx
|
||||||
|
.client
|
||||||
|
.get_object(&bucket_name, &object_name)
|
||||||
|
.build()
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Verify content-length before consuming
|
||||||
|
assert_eq!(resp.object_size().unwrap(), data.len() as u64);
|
||||||
|
|
||||||
|
// Get bytes directly
|
||||||
|
let got = resp.into_bytes().await.unwrap();
|
||||||
|
assert_eq!(got, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test into_boxed_stream method for streaming access.
|
||||||
|
#[minio_macros::test]
|
||||||
|
async fn get_object_into_boxed_stream(ctx: TestContext, bucket_name: String) {
|
||||||
|
let object_name = rand_object_name_utf8(20);
|
||||||
|
let data: Bytes = Bytes::from("test data for into_boxed_stream method");
|
||||||
|
|
||||||
|
// Upload test object
|
||||||
|
ctx.client
|
||||||
|
.put_object_content(&bucket_name, &object_name, data.clone())
|
||||||
|
.build()
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Retrieve using into_boxed_stream
|
||||||
|
let resp: GetObjectResponse = ctx
|
||||||
|
.client
|
||||||
|
.get_object(&bucket_name, &object_name)
|
||||||
|
.build()
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Get stream and content length
|
||||||
|
let (stream, content_length) = resp.into_boxed_stream().unwrap();
|
||||||
|
assert_eq!(content_length, data.len() as u64);
|
||||||
|
|
||||||
|
// Collect all bytes from the stream
|
||||||
|
let chunks: Vec<Bytes> = stream.try_collect().await.unwrap();
|
||||||
|
let got: Bytes = chunks.into_iter().flatten().collect();
|
||||||
|
assert_eq!(got, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test into_boxed_stream with larger content to verify chunked streaming.
|
||||||
|
#[minio_macros::test]
|
||||||
|
async fn get_object_into_boxed_stream_large(ctx: TestContext, bucket_name: String) {
|
||||||
|
let object_name = rand_object_name_utf8(20);
|
||||||
|
// Create larger test data (1MB) to ensure multiple chunks
|
||||||
|
let data: Bytes = Bytes::from(vec![0xABu8; 1024 * 1024]);
|
||||||
|
|
||||||
|
// Upload test object
|
||||||
|
ctx.client
|
||||||
|
.put_object_content(&bucket_name, &object_name, data.clone())
|
||||||
|
.build()
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Retrieve using into_boxed_stream
|
||||||
|
let resp: GetObjectResponse = ctx
|
||||||
|
.client
|
||||||
|
.get_object(&bucket_name, &object_name)
|
||||||
|
.build()
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let (stream, content_length) = resp.into_boxed_stream().unwrap();
|
||||||
|
assert_eq!(content_length, data.len() as u64);
|
||||||
|
|
||||||
|
// Collect and verify
|
||||||
|
let chunks: Vec<Bytes> = stream.try_collect().await.unwrap();
|
||||||
|
let got: Bytes = chunks.into_iter().flatten().collect();
|
||||||
|
assert_eq!(got.len(), data.len());
|
||||||
|
assert_eq!(got, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test into_bytes with empty content.
|
||||||
|
#[minio_macros::test]
|
||||||
|
async fn get_object_into_bytes_empty(ctx: TestContext, bucket_name: String) {
|
||||||
|
let object_name = rand_object_name_utf8(20);
|
||||||
|
let data: Bytes = Bytes::new();
|
||||||
|
|
||||||
|
// Upload empty object
|
||||||
|
ctx.client
|
||||||
|
.put_object_content(&bucket_name, &object_name, data.clone())
|
||||||
|
.build()
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Retrieve using into_bytes
|
||||||
|
let resp: GetObjectResponse = ctx
|
||||||
|
.client
|
||||||
|
.get_object(&bucket_name, &object_name)
|
||||||
|
.build()
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(resp.object_size().unwrap(), 0);
|
||||||
|
let got = resp.into_bytes().await.unwrap();
|
||||||
|
assert!(got.is_empty());
|
||||||
|
}
|
||||||
|
|||||||
@ -15,6 +15,9 @@
|
|||||||
|
|
||||||
//! S3 API Integration Tests
|
//! S3 API Integration Tests
|
||||||
|
|
||||||
|
// Client configuration
|
||||||
|
mod client_config;
|
||||||
|
|
||||||
// Object operations
|
// Object operations
|
||||||
mod append_object;
|
mod append_object;
|
||||||
mod get_object;
|
mod get_object;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user