diff --git a/CLAUDE.md b/CLAUDE.md index 274c6c1..702d1a9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -34,14 +34,14 @@ All source files that haven't been generated MUST include the following copyrigh ### Comments - **NO redundant comments** - Code should be self-documenting -- Avoid obvious comments like `// Set x to 5` for `x := 5` +- Avoid obvious comments like `// Set x to 5` for `let x = 5;` - Only add comments when they explain WHY, not WHAT - Document complex algorithms or non-obvious business logic ## Critical Code Patterns ### Builder Pattern -All S3 API requests MUST use the builder pattern, with the following documentation but then for the appropriate API +All S3 API requests MUST use the builder pattern, with documentation similar to the following example (adapted for each specific API) ```rust /// Argument builder for the [`AppendObject`](https://docs.aws.amazon.com/AmazonS3/latest/userguide/directory-buckets-objects-append.html) S3 API operation. @@ -73,6 +73,33 @@ impl Client { } ``` +### Rust-Specific Best Practices + +1. **Ownership and Borrowing** + - Prefer `&str` over `&String` in function parameters + - Use `AsRef` or `Into` for flexible string parameters + - Return owned types from functions unless lifetime annotations are clear + +2. **Type Safety** + - Use `#[must_use]` attribute for functions returning important values + - Prefer strong typing over primitive obsession + - Use newtypes for domain-specific values + +3. **Unsafe Code** + - Avoid `unsafe` code unless absolutely necessary + - Document all safety invariants when `unsafe` is required + - Isolate `unsafe` blocks and keep them minimal + +4. **Performance** + - Use `Cow<'_, str>` to avoid unnecessary allocations + - Prefer iterators over collecting into intermediate vectors + - Use `Box` sparingly; prefer generics when possible + +5. **Async Patterns** + - Use `tokio::select!` for concurrent operations + - Avoid blocking operations in async contexts + - Use `async-trait` for async trait methods + ## Code Quality Principles ### Why Code Quality Standards Are Mandatory @@ -126,8 +153,9 @@ Complex distributed systems code must remain **human-readable**: ### Variables - Use meaningful variable names that reflect business concepts - Variable names should reflect usage frequency: frequent variables can be shorter -- Constants should follow Rust patterns -- Global variables should be clearly identified and documented for their system-wide purpose +- Constants should use SCREAMING_SNAKE_CASE (e.g., `MAX_RETRIES`, `DEFAULT_TIMEOUT`) +- Static variables should be clearly identified with proper safety documentation +- Prefer `const` over `static` when possible for compile-time constants ### Developer Documentation @@ -179,7 +207,7 @@ Claude will periodically analyze the codebase and suggest: Before any code changes: 1. ✅ Run `cargo fmt --all` to check and fix code formatting 2. ✅ Run `cargo test` to ensure all tests pass -3. ✅ Run `cargo clippy` to check for common mistakes +3. ✅ Run `cargo clippy --all-targets --all-features --workspace -- -D warnings` to check for common mistakes and ensure no warnings 4. ✅ Ensure new code has appropriate test coverage 5. ✅ Verify no redundant comments are added @@ -222,6 +250,6 @@ fn operation() -> Result { - **Fix formatting**: `cargo fmt --all` - **Run tests**: `cargo test` - **Run specific test**: `cargo test test_name` -- **Check code**: `cargo clippy` +- **Check code**: `cargo clippy --all-targets --all-features --workspace -- -D warnings` - **Build project**: `cargo build --release` -- **Generate docs**: `cargo doc --open` +- **Generate docs**: `cargo doc --open` \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index a67d1b7..7150083 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,10 +56,10 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha2 = { version = "0.10", optional = true } urlencoding = "2.1" -xmltree = "0.11" +xmltree = "0.12" http = "1.3" thiserror = "2.0" -typed-builder = "0.22" +typed-builder = "0.23" [dev-dependencies] minio-common = { path = "./common" } @@ -87,6 +87,9 @@ name = "object_prompt" [[example]] name = "append_object" +[[example]] +name = "load_balancing_with_hooks" + [[bench]] name = "s3-api" path = "benches/s3/api_benchmarks.rs" diff --git a/common/src/test_context.rs b/common/src/test_context.rs index de4d221..4d2d989 100644 --- a/common/src/test_context.rs +++ b/common/src/test_context.rs @@ -144,7 +144,7 @@ impl TestContext { /// - `CleanupGuard` - A guard that automatically deletes the bucket when dropped. /// /// # Example - /// ```ignore + /// ```no_run /// let (bucket_name, guard) = client.create_bucket_helper().await; /// println!("Created temporary bucket: {}", bucket_name); /// // The bucket will be removed when `guard` is dropped. diff --git a/examples/debug_logging_hook.rs b/examples/debug_logging_hook.rs new file mode 100644 index 0000000..6ed71d1 --- /dev/null +++ b/examples/debug_logging_hook.rs @@ -0,0 +1,235 @@ +// MinIO Rust Library for Amazon S3 Compatible Cloud Storage +// Copyright 2024 MinIO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Example demonstrating how to use RequestHooks for debug logging. +//! +//! This example shows: +//! - Creating a custom debug logging hook +//! - Attaching the hook to the MinIO client +//! - Automatic logging of all S3 API requests with headers and response status +//! - Using both `before_signing_mut` and `after_execute` hooks +//! +//! Run with default values (test-bucket / test-object.txt, verbose mode enabled): +//! ``` +//! cargo run --example debug_logging_hook +//! ``` +//! +//! Run with custom bucket and object: +//! ``` +//! cargo run --example debug_logging_hook -- mybucket myobject +//! ``` +//! +//! Disable verbose output: +//! ``` +//! cargo run --example debug_logging_hook -- --no-verbose +//! ``` + +use clap::{ArgAction, Parser}; +use futures_util::StreamExt; +use minio::s3::builders::ObjectContent; +use minio::s3::client::hooks::{Extensions, RequestHooks}; +use minio::s3::client::{Method, Response}; +use minio::s3::creds::StaticProvider; +use minio::s3::error::Error; +use minio::s3::http::Url; +use minio::s3::multimap_ext::Multimap; +use minio::s3::response::BucketExistsResponse; +use minio::s3::segmented_bytes::SegmentedBytes; +use minio::s3::types::{S3Api, ToStream}; +use minio::s3::{MinioClient, MinioClientBuilder}; +use std::sync::Arc; + +/// Debug logging hook that prints detailed information about each S3 request. +#[derive(Debug)] +struct DebugLoggingHook { + /// Enable verbose output including all headers + verbose: bool, +} + +impl DebugLoggingHook { + fn new(verbose: bool) -> Self { + Self { verbose } + } +} + +#[async_trait::async_trait] +impl RequestHooks for DebugLoggingHook { + fn name(&self) -> &'static str { + "debug-logger" + } + + async fn before_signing_mut( + &self, + method: &Method, + url: &mut Url, + _region: &str, + _headers: &mut Multimap, + _query_params: &Multimap, + bucket_name: Option<&str>, + object_name: Option<&str>, + _body: Option<&SegmentedBytes>, + _extensions: &mut Extensions, + ) -> Result<(), Error> { + if self.verbose { + let bucket_obj = match (bucket_name, object_name) { + (Some(b), Some(o)) => format!("{b}/{o}"), + (Some(b), None) => b.to_string(), + _ => url.to_string(), + }; + println!("→ Preparing {method} request for {bucket_obj}"); + } + Ok(()) + } + + async fn after_execute( + &self, + method: &Method, + url: &Url, + _region: &str, + headers: &Multimap, + _query_params: &Multimap, + bucket_name: Option<&str>, + object_name: Option<&str>, + resp: &Result, + _extensions: &mut Extensions, + ) { + // Format the basic request info + let bucket_obj = match (bucket_name, object_name) { + (Some(b), Some(o)) => format!("{b}/{o}"), + (Some(b), None) => b.to_string(), + _ => url.to_string(), + }; + + // Format response status + let status = match resp { + Ok(response) => format!("✓ {}", response.status()), + Err(err) => format!("✗ Error: {err}"), + }; + + println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"); + println!("S3 Request: {method} {bucket_obj}"); + println!("URL: {url}"); + println!("Status: {status}"); + + if self.verbose { + // Print headers alphabetically + let mut header_strings: Vec = headers + .iter_all() + .map(|(k, v)| format!("{}: {}", k, v.join(","))) + .collect(); + header_strings.sort(); + + println!("\nRequest Headers:"); + for header in header_strings { + println!(" {header}"); + } + } + + println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"); + } +} + +/// Example demonstrating debug logging with hooks +#[derive(Parser)] +struct Cli { + /// Bucket to use for the example + #[arg(default_value = "test-bucket")] + bucket: String, + /// Object to upload + #[arg(default_value = "test-object.txt")] + object: String, + /// Disable verbose output (verbose is enabled by default, use --no-verbose to disable) + #[arg(long = "no-verbose", action = ArgAction::SetFalse, default_value_t = true)] + verbose: bool, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + let args = Cli::parse(); + + println!("\n🔧 MinIO Debug Logging Hook Example\n"); + println!("This example demonstrates how hooks can be used for debugging S3 requests."); + println!( + "We'll perform a few operations on bucket '{}' with debug logging enabled.\n", + args.bucket + ); + + // Create the debug logging hook + let debug_hook = Arc::new(DebugLoggingHook::new(args.verbose)); + + // Create MinIO client with the debug logging hook attached + let static_provider = StaticProvider::new( + "Q3AM3UQ867SPQQA43P2F", + "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG", + None, + ); + + let client: MinioClient = MinioClientBuilder::new("https://play.min.io".parse()?) + .provider(Some(static_provider)) + .hook(debug_hook) // Attach the debug logging hook + .build()?; + + println!("✓ Created MinIO client with debug logging hook\n"); + + // Operation 1: Check if bucket exists + println!("📋 Checking if bucket exists..."); + let resp: BucketExistsResponse = client.bucket_exists(&args.bucket).build().send().await?; + + // Operation 2: Create bucket if it doesn't exist + if !resp.exists() { + println!("\n📋 Creating bucket..."); + client.create_bucket(&args.bucket).build().send().await?; + } else { + println!("\n✓ Bucket already exists"); + } + + // Operation 3: Upload a small object + println!("\n📋 Uploading object..."); + let content = b"Hello from MinIO Rust SDK with debug logging!"; + let object_content: ObjectContent = content.to_vec().into(); + client + .put_object_content(&args.bucket, &args.object, object_content) + .build() + .send() + .await?; + + // Operation 4: List objects in the bucket + println!("\n📋 Listing objects in bucket..."); + let mut list_stream = client + .list_objects(&args.bucket) + .recursive(false) + .build() + .to_stream() + .await; + + let mut total_objects = 0; + while let Some(result) = list_stream.next().await { + match result { + Ok(resp) => { + total_objects += resp.contents.len(); + } + Err(e) => { + eprintln!("Error listing objects: {e}"); + } + } + } + println!("\n✓ Found {total_objects} objects in bucket"); + + println!("\n🎉 All operations completed successfully with debug logging enabled!\n"); + println!("💡 Tip: Run with --no-verbose to disable detailed output\n"); + + Ok(()) +} diff --git a/examples/load_balancing_with_hooks.rs b/examples/load_balancing_with_hooks.rs new file mode 100644 index 0000000..b67a393 --- /dev/null +++ b/examples/load_balancing_with_hooks.rs @@ -0,0 +1,702 @@ +// MinIO Rust Library for Amazon S3 Compatible Cloud Storage +// Copyright 2024 MinIO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! # Load Balancing with Request Hooks Example +//! +//! This example demonstrates how to implement client-side load balancing for MinIO +//! using the RequestHooks infrastructure. It shows multiple load balancing strategies +//! and how to monitor their effectiveness. +//! +//! ## Key Concepts +//! +//! 1. **Request Hooks**: Intercept and modify requests before they're signed and sent +//! 2. **Load Balancing**: Distribute requests across multiple MinIO nodes +//! 3. **Health Checking**: Track node availability and response times +//! 4. **Telemetry**: Monitor load distribution and performance +//! 5. **Redirect Headers**: When URL is modified, automatic headers are added: +//! - `x-minio-redirect-from`: Original target URL +//! - `x-minio-redirect-to`: New destination URL after load balancing +//! +//! ## Usage +//! +//! ```bash +//! # Set up your MinIO cluster nodes +//! export MINIO_NODES="node1.minio.local:9000,node2.minio.local:9000,node3.minio.local:9000" +//! export MINIO_ROOT_USER="minioadmin" +//! export MINIO_ROOT_PASSWORD="minioadmin" +//! +//! # Run with round-robin strategy (default) +//! cargo run --example load_balancing_with_hooks +//! +//! # Run with least-connections strategy +//! cargo run --example load_balancing_with_hooks -- --strategy least-connections +//! +//! # Run with weighted round-robin +//! cargo run --example load_balancing_with_hooks -- --strategy weighted +//! ``` + +use clap::{Parser, ValueEnum}; +use http::{Extensions, Method}; +use minio::s3::client::{MinioClientBuilder, RequestHooks}; +use minio::s3::creds::StaticProvider; +use minio::s3::error::Error; +use minio::s3::http::{BaseUrl, Url}; +use minio::s3::multimap_ext::Multimap; +use minio::s3::segmented_bytes::SegmentedBytes; +use minio::s3::types::{S3Api, ToStream}; +use reqwest::Response; +use std::env; +use std::str::FromStr; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; + +/// Command-line arguments for the load balancing example +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Load balancing strategy to use + #[arg(short, long, value_enum, default_value = "round-robin")] + strategy: LoadBalanceStrategy, + + /// Number of requests to make for testing + #[arg(short = 'n', long, default_value = "100")] + requests: usize, + + /// Enable verbose debug output + #[arg(short, long)] + verbose: bool, +} + +/// Available load balancing strategies +#[derive(Clone, Copy, Debug, ValueEnum)] +enum LoadBalanceStrategy { + /// Simple round-robin: cycles through nodes sequentially + RoundRobin, + /// Least connections: routes to the node with fewest active connections + LeastConnections, + /// Weighted round-robin: distributes based on node weights/capacity + Weighted, + /// Random selection: randomly picks a node for each request + Random, +} + +/// Represents a MinIO node in the cluster with its health and performance metrics +#[derive(Debug, Clone)] +struct Node { + /// The hostname or IP address of the node + host: String, + /// The port number (typically 9000 for MinIO) + port: u16, + /// Whether to use HTTPS for this node + https: bool, + /// Current number of active connections to this node + active_connections: Arc, + /// Total number of requests sent to this node + total_requests: Arc, + /// Total response time in milliseconds for all requests + total_response_time_ms: Arc, + /// Number of failed requests to this node + failed_requests: Arc, + /// Weight for weighted round-robin (higher = more traffic) + weight: u32, + /// Whether the node is currently healthy + is_healthy: Arc>, + /// Last time the node was checked for health + last_health_check: Arc>, +} + +impl Node { + /// Creates a new node from a host:port string + /// + /// # Arguments + /// * `host_port` - String in format "hostname:port" or just "hostname" (defaults to port 9000) + /// * `https` - Whether to use HTTPS for this node + /// * `weight` - Weight for weighted round-robin (default 1) + fn new(host_port: &str, https: bool, weight: u32) -> Self { + let parts: Vec<&str> = host_port.split(':').collect(); + let (host, port) = if parts.len() == 2 { + (parts[0].to_string(), parts[1].parse().unwrap_or(9000)) + } else { + (host_port.to_string(), 9000) + }; + + Self { + host, + port, + https, + active_connections: Arc::new(AtomicUsize::new(0)), + total_requests: Arc::new(AtomicU64::new(0)), + total_response_time_ms: Arc::new(AtomicU64::new(0)), + failed_requests: Arc::new(AtomicU64::new(0)), + weight, + is_healthy: Arc::new(RwLock::new(true)), + last_health_check: Arc::new(RwLock::new(Instant::now())), + } + } + + /// Returns the current number of active connections + fn get_active_connections(&self) -> usize { + self.active_connections.load(Ordering::Relaxed) + } + + /// Returns whether the node is currently considered healthy + fn is_healthy(&self) -> bool { + *self.is_healthy.read().unwrap() + } + + /// Calculates the average response time in milliseconds + fn average_response_time_ms(&self) -> f64 { + let total_requests = self.total_requests.load(Ordering::Relaxed); + if total_requests == 0 { + 0.0 + } else { + self.total_response_time_ms.load(Ordering::Relaxed) as f64 / total_requests as f64 + } + } + + /// Updates the node's health status based on recent failures + fn update_health_status(&self) { + let total = self.total_requests.load(Ordering::Relaxed); + let failed = self.failed_requests.load(Ordering::Relaxed); + + // Mark unhealthy if more than 50% of recent requests failed + let is_healthy = if total > 10 { + (failed as f64 / total as f64) < 0.5 + } else { + true // Not enough data to determine + }; + + *self.is_healthy.write().unwrap() = is_healthy; + *self.last_health_check.write().unwrap() = Instant::now(); + } +} + +/// Main load balancer hook that implements various load balancing strategies +/// +/// This hook intercepts all S3 requests and redirects them to different nodes +/// based on the selected strategy. It also tracks metrics for monitoring. +#[derive(Debug, Clone)] +struct LoadBalancerHook { + /// List of available MinIO nodes + nodes: Arc>, + /// Selected load balancing strategy + strategy: LoadBalanceStrategy, + /// Counter for round-robin strategy + round_robin_counter: Arc, + /// Whether to print verbose debug information + verbose: bool, +} + +impl LoadBalancerHook { + /// Creates a new load balancer hook + /// + /// # Arguments + /// * `nodes` - List of MinIO nodes to balance between + /// * `strategy` - Load balancing strategy to use + /// * `verbose` - Whether to print debug information + fn new(nodes: Vec, strategy: LoadBalanceStrategy, verbose: bool) -> Self { + if nodes.is_empty() { + panic!("At least one node must be provided"); + } + + Self { + nodes: Arc::new(nodes), + strategy, + round_robin_counter: Arc::new(AtomicUsize::new(0)), + verbose, + } + } + + /// Selects the next node based on the configured strategy + /// + /// # Returns + /// The selected node, or None if no healthy nodes are available + fn select_node(&self) -> Option { + // Filter to only healthy nodes + let healthy_nodes: Vec = self + .nodes + .iter() + .filter(|n| n.is_healthy()) + .cloned() + .collect(); + + if healthy_nodes.is_empty() { + // If no healthy nodes, try all nodes + if self.verbose { + println!("WARNING: No healthy nodes available, using all nodes"); + } + return self.select_from_nodes(&self.nodes); + } + + self.select_from_nodes(&healthy_nodes) + } + + /// Internal method to select from a given set of nodes + fn select_from_nodes(&self, nodes: &[Node]) -> Option { + if nodes.is_empty() { + return None; + } + + match self.strategy { + LoadBalanceStrategy::RoundRobin => { + // Simple round-robin: cycle through nodes sequentially + let index = self.round_robin_counter.fetch_add(1, Ordering::SeqCst) % nodes.len(); + Some(nodes[index].clone()) + } + + LoadBalanceStrategy::LeastConnections => { + // Select the node with the fewest active connections + nodes + .iter() + .min_by_key(|n| n.get_active_connections()) + .cloned() + } + + LoadBalanceStrategy::Weighted => { + // Weighted round-robin: distribute based on node weights + // Build a weighted list where nodes appear multiple times based on weight + let mut weighted_nodes = Vec::new(); + for node in nodes { + for _ in 0..node.weight { + weighted_nodes.push(node.clone()); + } + } + + if weighted_nodes.is_empty() { + Some(nodes[0].clone()) + } else { + let index = self.round_robin_counter.fetch_add(1, Ordering::SeqCst) + % weighted_nodes.len(); + Some(weighted_nodes[index].clone()) + } + } + + LoadBalanceStrategy::Random => { + // Random selection using a simple pseudo-random approach + let seed = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() as usize; + let index = seed % nodes.len(); + Some(nodes[index].clone()) + } + } + } + + /// Prints current statistics for all nodes + fn print_stats(&self) { + println!("\n=== Load Balancer Statistics ==="); + println!("Strategy: {:?}", self.strategy); + println!("\nNode Statistics:"); + + for (i, node) in self.nodes.iter().enumerate() { + let total = node.total_requests.load(Ordering::Relaxed); + let failed = node.failed_requests.load(Ordering::Relaxed); + let active = node.get_active_connections(); + let avg_time = node.average_response_time_ms(); + let health_status = if node.is_healthy() { + "HEALTHY" + } else { + "UNHEALTHY" + }; + + println!("\nNode {}: {}:{}", i + 1, node.host, node.port); + println!(" Status: {}", health_status); + println!(" Weight: {}", node.weight); + println!(" Total Requests: {}", total); + println!(" Failed Requests: {}", failed); + println!(" Active Connections: {}", active); + println!(" Avg Response Time: {:.2} ms", avg_time); + + if total > 0 { + let success_rate = ((total - failed) as f64 / total as f64) * 100.0; + println!(" Success Rate: {:.1}%", success_rate); + } + } + println!("\n================================"); + } +} + +#[async_trait::async_trait] +impl RequestHooks for LoadBalancerHook { + fn name(&self) -> &'static str { + "load-balancer" + } + + /// Called before the request is signed - this is where we redirect to selected node + async fn before_signing_mut( + &self, + method: &Method, + url: &mut Url, + _region: &str, + _headers: &mut Multimap, + _query_params: &Multimap, + bucket_name: Option<&str>, + object_name: Option<&str>, + _body: Option<&SegmentedBytes>, + extensions: &mut Extensions, + ) -> Result<(), Error> { + // Select the target node + let node = self + .select_node() + .ok_or_else(|| Error::Validation(minio::s3::error::ValidationErr::MissingBucketName))?; + + // Store the selected node and start time in extensions for later use + extensions.insert(node.clone()); + extensions.insert(Instant::now()); + + // Increment active connections for this node + node.active_connections.fetch_add(1, Ordering::SeqCst); + node.total_requests.fetch_add(1, Ordering::SeqCst); + + // Update the URL to point to the selected node + let original_host = url.host.clone(); + url.host = node.host.clone(); + url.port = node.port; + url.https = node.https; + + // Note: When we modify the URL here, the MinIO client will automatically add + // x-minio-redirect-from and x-minio-redirect-to headers to track the redirection + // for server-side telemetry and debugging. + + if self.verbose { + println!( + "[{}] Routing {} request to {}:{} (was: {})", + chrono::Local::now().format("%H:%M:%S%.3f"), + method, + node.host, + node.port, + original_host + ); + + if let Some(bucket) = bucket_name { + print!(" - Bucket: {}", bucket); + if let Some(object) = object_name { + print!(", Object: {}", object); + } + println!(); + } + + println!( + " Active connections on this node: {}", + node.get_active_connections() + ); + } + + Ok(()) + } + + /// Called after the request completes - update metrics and health status + async fn after_execute( + &self, + method: &Method, + _url: &Url, + _region: &str, + _headers: &Multimap, + _query_params: &Multimap, + _bucket_name: Option<&str>, + _object_name: Option<&str>, + resp: &Result, + extensions: &mut Extensions, + ) { + // Retrieve the node and start time from extensions + if let Some(node) = extensions.get::() { + // Decrement active connections + node.active_connections.fetch_sub(1, Ordering::SeqCst); + + // Calculate response time + if let Some(start_time) = extensions.get::() { + let duration = start_time.elapsed(); + let response_time_ms = duration.as_millis() as u64; + node.total_response_time_ms + .fetch_add(response_time_ms, Ordering::SeqCst); + + if self.verbose { + let status_str = match resp { + Ok(response) => format!("HTTP {}", response.status().as_u16()), + Err(err) => format!("Error: {}", err), + }; + + println!( + "[{}] Response from {}:{} - {} - {} - {:?}", + chrono::Local::now().format("%H:%M:%S%.3f"), + node.host, + node.port, + method, + status_str, + duration + ); + } + } + + // Track failures + if resp.is_err() || resp.as_ref().unwrap().status().is_server_error() { + node.failed_requests.fetch_add(1, Ordering::SeqCst); + + // Update health status if too many failures + node.update_health_status(); + + if !node.is_healthy() && self.verbose { + println!( + "WARNING: Node {}:{} marked as UNHEALTHY due to high failure rate", + node.host, node.port + ); + } + } + } + } +} + +/// Example telemetry hook that works alongside the load balancer +/// +/// This demonstrates how multiple hooks can work together to provide +/// comprehensive monitoring and logging. +#[derive(Debug)] +struct TelemetryHook { + request_count: Arc, + verbose: bool, +} + +impl TelemetryHook { + fn new(verbose: bool) -> Self { + Self { + request_count: Arc::new(AtomicU64::new(0)), + verbose, + } + } +} + +#[async_trait::async_trait] +impl RequestHooks for TelemetryHook { + fn name(&self) -> &'static str { + "telemetry" + } + + async fn before_signing_mut( + &self, + _method: &Method, + _url: &mut Url, + _region: &str, + _headers: &mut Multimap, + _query_params: &Multimap, + _bucket_name: Option<&str>, + _object_name: Option<&str>, + _body: Option<&SegmentedBytes>, + _extensions: &mut Extensions, + ) -> Result<(), Error> { + let count = self.request_count.fetch_add(1, Ordering::SeqCst) + 1; + + if self.verbose && count.is_multiple_of(10) { + println!("📊 Total requests processed: {}", count); + } + + Ok(()) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Parse command-line arguments + let args = Args::parse(); + + println!("🚀 MinIO Load Balancing Example"); + println!("================================\n"); + + // Get MinIO nodes from environment or use defaults + let nodes_str = env::var("MINIO_NODES") + .unwrap_or_else(|_| "localhost:9000,localhost:9001,localhost:9002".to_string()); + + println!("Configuring nodes from: {}", nodes_str); + + // Parse nodes and create Node instances + let mut nodes: Vec = Vec::new(); + for (i, node_str) in nodes_str.split(',').enumerate() { + // For weighted strategy, give different weights to nodes + let weight = if matches!(args.strategy, LoadBalanceStrategy::Weighted) { + // Example: first node gets weight 3, second gets 2, others get 1 + match i { + 0 => 3, + 1 => 2, + _ => 1, + } + } else { + 1 + }; + + let node = Node::new(node_str.trim(), false, weight); + println!( + " - Node {}: {}:{} (weight: {})", + i + 1, + node.host, + node.port, + weight + ); + nodes.push(node); + } + + if nodes.is_empty() { + eprintln!("❌ Error: No nodes configured. Set MINIO_NODES environment variable."); + std::process::exit(1); + } + + println!("\nStrategy: {:?}", args.strategy); + println!("Requests to make: {}", args.requests); + println!("Verbose mode: {}\n", args.verbose); + + // Get credentials from environment + let access_key = env::var("MINIO_ROOT_USER") + .or_else(|_| env::var("MINIO_ACCESS_KEY")) + .unwrap_or_else(|_| "minioadmin".to_string()); + + let secret_key = env::var("MINIO_ROOT_PASSWORD") + .or_else(|_| env::var("MINIO_SECRET_KEY")) + .unwrap_or_else(|_| "minioadmin".to_string()); + + // Create the load balancer hook + let load_balancer = LoadBalancerHook::new(nodes.clone(), args.strategy, args.verbose); + + // Create the telemetry hook + let telemetry = TelemetryHook::new(args.verbose); + + // Use the first node as the initial base URL (will be overridden by the hook) + let base_url = BaseUrl::from_str(&format!("http://{}:{}", nodes[0].host, nodes[0].port))?; + + // Build the MinIO client with our hooks + println!("Building MinIO client with load balancing hooks..."); + let client = MinioClientBuilder::new(base_url) + .provider(Some(StaticProvider::new(&access_key, &secret_key, None))) + .hook(Arc::new(load_balancer.clone())) + .hook(Arc::new(telemetry)) + .build()?; + + println!("✅ Client configured successfully\n"); + + // Create a test bucket name + let test_bucket = format!("load-balance-test-{}", chrono::Utc::now().timestamp()); + + println!("Creating test bucket: {}", test_bucket); + + // Try to create the bucket (might fail if it exists, that's ok) + match client.create_bucket(&test_bucket).build().send().await { + Ok(_) => println!("✅ Bucket created successfully"), + Err(e) => { + // Check if it's because the bucket already exists + if e.to_string().contains("BucketAlreadyOwnedByYou") + || e.to_string().contains("BucketAlreadyExists") + { + println!("ℹ️ Bucket already exists, continuing..."); + } else { + println!("⚠️ Could not create bucket: {}", e); + println!(" Continuing with tests anyway..."); + } + } + } + + println!("\nStarting load balanced requests...\n"); + + // Perform test requests + let start_time = Instant::now(); + let mut success_count = 0; + let mut failure_count = 0; + + for i in 0..args.requests { + // Mix of different operations to simulate real load + let operation = i % 4; + + let result = match operation { + 0 => { + // List buckets + client.list_buckets().build().send().await.map(|_| ()) + } + 1 => { + // Check if bucket exists + client + .bucket_exists(&test_bucket) + .build() + .send() + .await + .map(|_| ()) + } + 2 => { + // Stat a non-existent object (will fail but that's ok) + client + .stat_object(&test_bucket, format!("test-object-{}", i)) + .build() + .send() + .await + .map(|_| ()) + } + 3 => { + // List objects in bucket (just check if we can start the stream) + drop(client.list_objects(&test_bucket).build().to_stream()); + Ok(()) + } + _ => unreachable!(), + }; + + match result { + Ok(_) => success_count += 1, + Err(_) => failure_count += 1, + } + + // Small delay between requests to avoid overwhelming the servers + if i < args.requests - 1 { + tokio::time::sleep(Duration::from_millis(10)).await; + } + + // Print progress + if !args.verbose && (i + 1) % 10 == 0 { + print!("."); + use std::io::Write; + std::io::stdout().flush().unwrap(); + } + } + + if !args.verbose { + println!(); // New line after progress dots + } + + let total_duration = start_time.elapsed(); + + println!("\n✅ Load testing complete!"); + println!("\n=== Final Results ==="); + println!("Total time: {:?}", total_duration); + println!("Total requests: {}", args.requests); + println!("Successful: {}", success_count); + println!("Failed: {}", failure_count); + println!( + "Success rate: {:.1}%", + (success_count as f64 / args.requests as f64) * 100.0 + ); + println!( + "Requests/sec: {:.2}", + args.requests as f64 / total_duration.as_secs_f64() + ); + + // Print detailed statistics from the load balancer + load_balancer.print_stats(); + + // Clean up: try to delete the test bucket + println!("\nCleaning up test bucket..."); + match client.delete_bucket(&test_bucket).build().send().await { + Ok(_) => println!("✅ Test bucket deleted"), + Err(e) => println!("ℹ️ Could not delete test bucket: {}", e), + } + + println!("\n🎉 Example complete!"); + + Ok(()) +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml index e3ff522..de80285 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,3 @@ [toolchain] -channel = "1.88.0" -components = ["clippy", "rustfmt"] -#targets = ["x86_64-unknown-linux-musl"] \ No newline at end of file +channel = "1.90.0" +components = ["clippy", "rustfmt"] \ No newline at end of file diff --git a/src/s3/builders/put_object.rs b/src/s3/builders/put_object.rs index de186ff..61ea948 100644 --- a/src/s3/builders/put_object.rs +++ b/src/s3/builders/put_object.rs @@ -643,10 +643,10 @@ impl PutObjectContent { return Err(ValidationErr::TooManyParts(part_number as u64).into()); } - if let Some(exp) = object_size.value() { - if exp < total_read { - return Err(ValidationErr::TooMuchData(exp).into()); - } + if let Some(exp) = object_size.value() + && exp < total_read + { + return Err(ValidationErr::TooMuchData(exp).into()); } // Upload the part now. @@ -686,14 +686,14 @@ impl PutObjectContent { // Complete the multipart upload. let size = parts.iter().map(|p| p.size).sum(); - if let Some(expected) = object_size.value() { - if expected != size { - return Err(ValidationErr::InsufficientData { - expected, - got: size, - } - .into()); + if let Some(expected) = object_size.value() + && expected != size + { + return Err(ValidationErr::InsufficientData { + expected, + got: size, } + .into()); } let resp: CompleteMultipartUploadResponse = CompleteMultipartUpload { diff --git a/src/s3/client.rs b/src/s3/client.rs index ff640d3..dbcce27 100644 --- a/src/s3/client.rs +++ b/src/s3/client.rs @@ -18,8 +18,10 @@ use bytes::Bytes; use dashmap::DashMap; use http::HeaderMap; -use hyper::http::Method; +pub use hyper::http::Method; use reqwest::Body; +pub use reqwest::Response; +use std::fmt::Debug; use std::fs::File; use std::io::prelude::*; use std::mem; @@ -28,12 +30,13 @@ use std::sync::{Arc, OnceLock}; use uuid::Uuid; use crate::s3::builders::{BucketExists, ComposeSource}; +pub use crate::s3::client::hooks::RequestHooks; use crate::s3::creds::Provider; #[cfg(feature = "localhost")] use crate::s3::creds::StaticProvider; use crate::s3::error::{Error, IoError, NetworkError, S3ServerError, ValidationErr}; use crate::s3::header_constants::*; -use crate::s3::http::BaseUrl; +use crate::s3::http::{BaseUrl, Url}; use crate::s3::minio_error_response::{MinioErrorCode, MinioErrorResponse}; use crate::s3::multimap_ext::{Multimap, MultimapExt}; use crate::s3::response::a_response_traits::{HasEtagFromHeaders, HasS3Fields}; @@ -72,6 +75,7 @@ mod get_object_tagging; mod get_presigned_object_url; mod get_presigned_post_form_data; mod get_region; +pub mod hooks; mod list_buckets; mod list_objects; mod listen_bucket_notification; @@ -129,6 +133,7 @@ pub struct MinioClientBuilder { base_url: BaseUrl, /// Set the credential provider. If not, set anonymous access is used. provider: Option>, + client_hooks: Vec>, /// Set file for loading CAs certs to trust. This is in addition to the system trust store. The file must contain PEM encoded certificates. ssl_cert_file: Option, /// Set flag to ignore certificate check. This is insecure and should only be used for testing. @@ -144,12 +149,20 @@ impl MinioClientBuilder { Self { base_url, provider: None, + client_hooks: Vec::new(), ssl_cert_file: None, ignore_cert_check: None, app_info: None, } } + /// Add a client hook to the builder. Hooks will be called after each other in + /// order they were added. + pub fn hook(mut self, hooks: Arc) -> Self { + self.client_hooks.push(hooks); + self + } + /// Set the credential provider. If not, set anonymous access is used. pub fn provider(mut self, provider: Option

) -> Self { self.provider = provider.map(|p| Arc::new(p) as Arc); @@ -223,6 +236,7 @@ impl MinioClientBuilder { shared: Arc::new(SharedClientItems { base_url: self.base_url, provider: self.provider, + client_hooks: self.client_hooks, region_map: Default::default(), express: Default::default(), }), @@ -441,55 +455,81 @@ impl MinioClient { body: Option>, retry: bool, ) -> Result { - let url = self.shared.base_url.build_url( + let mut url = self.shared.base_url.build_url( method, region, query_params, bucket_name, object_name, )?; + let mut extensions = http::Extensions::default(); - { - headers.add(HOST, url.host_header_value()); - let sha256: String = match *method { - Method::PUT | Method::POST => { - if !headers.contains_key(CONTENT_TYPE) { - headers.add(CONTENT_TYPE, "application/octet-stream"); - } - let len: usize = body.as_ref().map_or(0, |b| b.len()); - headers.add(CONTENT_LENGTH, len.to_string()); - match body { - None => EMPTY_SHA256.into(), - Some(ref v) => { - let clone = v.clone(); - async_std::task::spawn_blocking(move || sha256_hash_sb(clone)).await - } + headers.add(HOST, url.host_header_value()); + let sha256: String = match *method { + Method::PUT | Method::POST => { + if !headers.contains_key(CONTENT_TYPE) { + headers.add(CONTENT_TYPE, "application/octet-stream"); + } + let len: usize = body.as_ref().map_or(0, |b| b.len()); + headers.add(CONTENT_LENGTH, len.to_string()); + match body { + None => EMPTY_SHA256.into(), + Some(ref v) => { + let clone = v.clone(); + async_std::task::spawn_blocking(move || sha256_hash_sb(clone)).await } } - _ => EMPTY_SHA256.into(), - }; - headers.add(X_AMZ_CONTENT_SHA256, sha256.clone()); - - let date = utc_now(); - headers.add(X_AMZ_DATE, to_amz_date(date)); - if let Some(p) = &self.shared.provider { - let creds = p.fetch(); - if creds.session_token.is_some() { - headers.add(X_AMZ_SECURITY_TOKEN, creds.session_token.unwrap()); - } - sign_v4_s3( - method, - &url.path, - region, - headers, - query_params, - &creds.access_key, - &creds.secret_key, - &sha256, - date, - ); } + _ => EMPTY_SHA256.into(), + }; + headers.add(X_AMZ_CONTENT_SHA256, sha256.clone()); + + let date = utc_now(); + headers.add(X_AMZ_DATE, to_amz_date(date)); + + // Allow hooks to modify the request before signing (e.g., for client-side load balancing) + let url_before_hook = url.to_string(); + self.run_before_signing_hooks( + method, + &mut url, + region, + headers, + query_params, + bucket_name, + object_name, + &body, + &mut extensions, + ) + .await?; + + // If a hook modified the URL (e.g., redirecting to a different MinIO node for load balancing), + // add headers to inform the server about the client-side redirection. + // This enables server-side telemetry, debugging, and load balancing metrics. + // x-minio-redirect-from: The original URL before hook modification + // x-minio-redirect-to: The actual endpoint where the request is being sent + if url.to_string() != url_before_hook { + headers.add("x-minio-redirect-from", &url_before_hook); + headers.add("x-minio-redirect-to", url.to_string()); } + + if let Some(p) = &self.shared.provider { + let creds = p.fetch(); + if creds.session_token.is_some() { + headers.add(X_AMZ_SECURITY_TOKEN, creds.session_token.unwrap()); + } + sign_v4_s3( + method, + &url.path, + region, + headers, + query_params, + &creds.access_key, + &creds.secret_key, + &sha256, + date, + ); + } + let mut req = self.http_client.request(method.clone(), url.to_string()); for (key, values) in headers.iter_all() { @@ -498,31 +538,9 @@ impl MinioClient { } } - if false { - let mut header_strings: Vec = headers - .iter_all() - .map(|(k, v)| format!("{}: {}", k, v.join(","))) - .collect(); - - // Sort headers alphabetically by name - header_strings.sort(); - - let debug_str = format!( - "S3 request: {method} url={:?}; headers={:?}; body={body:?}", - url.path, - header_strings.join("; ") - ); - let truncated = if debug_str.len() > 1000 { - format!("{}...", &debug_str[..997]) - } else { - debug_str - }; - println!("{truncated}"); - } - if (*method == Method::PUT) || (*method == Method::POST) { //TODO: why-oh-why first collect into a vector and then iterate to a stream? - let bytes_vec: Vec = match body { + let bytes_vec: Vec = match body.as_ref() { Some(v) => v.iter().collect(), None => Vec::new(), }; @@ -532,7 +550,22 @@ impl MinioClient { req = req.body(Body::wrap_stream(stream)); } - let resp: reqwest::Response = req.send().await.map_err(ValidationErr::from)?; //TODO request error handled by network error layer + let resp = req.send().await; + + self.run_after_execute_hooks( + method, + &url, + region, + headers, + query_params, + bucket_name, + object_name, + &resp, + &mut extensions, + ) + .await; + + let resp = resp.map_err(ValidationErr::from)?; if resp.status().is_success() { return Ok(resp); } @@ -612,6 +645,64 @@ impl MinioClient { .await } + async fn run_after_execute_hooks( + &self, + method: &Method, + url: &Url, + region: &str, + headers: &mut Multimap, + query_params: &Multimap, + bucket_name: Option<&str>, + object_name: Option<&str>, + resp: &Result, + extensions: &mut http::Extensions, + ) { + for hook in self.shared.client_hooks.iter() { + hook.after_execute( + method, + url, + region, + headers, + query_params, + bucket_name, + object_name, + resp, + extensions, + ) + .await; + } + } + + async fn run_before_signing_hooks( + &self, + method: &Method, + url: &mut Url, + region: &str, + headers: &mut Multimap, + query_params: &Multimap, + bucket_name: Option<&str>, + object_name: Option<&str>, + body: &Option>, + extensions: &mut http::Extensions, + ) -> Result<(), Error> { + for hook in self.shared.client_hooks.iter() { + hook.before_signing_mut( + method, + url, + region, + headers, + query_params, + bucket_name, + object_name, + body.as_deref(), + extensions, + ) + .await + .inspect_err(|e| log::warn!("Hook {} failed {e}", hook.name()))?; + } + Ok(()) + } + /// create an example client for testing on localhost #[cfg(feature = "localhost")] pub fn create_client_on_localhost() @@ -632,6 +723,7 @@ impl MinioClient { pub(crate) struct SharedClientItems { pub(crate) base_url: BaseUrl, pub(crate) provider: Option>, + client_hooks: Vec>, region_map: DashMap, express: OnceLock, } diff --git a/src/s3/client/hooks.rs b/src/s3/client/hooks.rs new file mode 100644 index 0000000..5d3943d --- /dev/null +++ b/src/s3/client/hooks.rs @@ -0,0 +1,536 @@ +// MinIO Rust Library for Amazon S3 Compatible Cloud Storage +// Copyright 2024 MinIO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Request hooks for intercepting and modifying S3 API requests. +//! +//! Hooks allow you to intercept requests at two key points: +//! - [`RequestHooks::before_signing_mut`] - Modify URL, headers, and parameters before signing +//! - [`RequestHooks::after_execute`] - Inspect responses and track metrics +//! +//! Common use cases: load balancing, telemetry, debug logging, request routing. +//! +//! When a hook modifies the URL, the client adds `x-minio-redirect-from` and `x-minio-redirect-to` headers for server-side tracking. + +pub use http::Extensions; + +use crate::s3::error::Error; +use crate::s3::http::Url; +use crate::s3::multimap_ext::Multimap; +use crate::s3::segmented_bytes::SegmentedBytes; +use http::Method; +use reqwest::Response; +use std::fmt::Debug; + +/// Trait for intercepting and modifying S3 API requests. +/// +/// Hooks are called in order and can abort requests by returning errors from `before_signing_mut`. +/// +/// # Examples +/// +/// ## Load Balancing +/// Redirect requests across multiple MinIO nodes: +/// ```no_run +/// use minio::s3::client::RequestHooks; +/// use minio::s3::http::Url; +/// use minio::s3::error::Error; +/// use minio::s3::multimap_ext::Multimap; +/// use minio::s3::segmented_bytes::SegmentedBytes; +/// use http::{Method, Extensions}; +/// use std::sync::atomic::{AtomicUsize, Ordering}; +/// use std::sync::Arc; +/// +/// #[derive(Debug, Clone)] +/// struct LoadBalancerHook { +/// nodes: Vec, +/// counter: Arc, +/// } +/// +/// impl LoadBalancerHook { +/// fn new(nodes: Vec) -> Self { +/// Self { +/// nodes, +/// counter: Arc::new(AtomicUsize::new(0)), +/// } +/// } +/// +/// fn select_node(&self) -> &str { +/// // Round-robin load balancing +/// let index = self.counter.fetch_add(1, Ordering::SeqCst) % self.nodes.len(); +/// &self.nodes[index] +/// } +/// } +/// +/// #[async_trait::async_trait] +/// impl RequestHooks for LoadBalancerHook { +/// fn name(&self) -> &'static str { +/// "load-balancer" +/// } +/// +/// async fn before_signing_mut( +/// &self, +/// _method: &Method, +/// url: &mut Url, +/// _region: &str, +/// _headers: &mut Multimap, +/// _query_params: &Multimap, +/// _bucket_name: Option<&str>, +/// _object_name: Option<&str>, +/// _body: Option<&SegmentedBytes>, +/// _extensions: &mut Extensions, +/// ) -> Result<(), Error> { +/// // Select a node based on load balancing strategy +/// url.host = self.select_node().to_string(); +/// // Note: The client will automatically add x-minio-redirect-from +/// // and x-minio-redirect-to headers when URL is modified +/// Ok(()) +/// } +/// } +/// +/// # fn main() {} +/// ``` +/// +/// ## Telemetry & Debug Logging +/// Track timing and log request details: +/// ```no_run +/// use minio::s3::client::RequestHooks; +/// use minio::s3::error::Error; +/// use minio::s3::http::Url; +/// use minio::s3::multimap_ext::Multimap; +/// use minio::s3::segmented_bytes::SegmentedBytes; +/// use http::{Method, Extensions}; +/// use reqwest::Response; +/// use std::time::Instant; +/// +/// #[derive(Debug)] +/// struct LoggingHook; +/// +/// #[async_trait::async_trait] +/// impl RequestHooks for LoggingHook { +/// fn name(&self) -> &'static str { "logger" } +/// +/// async fn before_signing_mut( +/// &self, +/// method: &Method, +/// url: &mut Url, +/// _region: &str, +/// _headers: &mut Multimap, +/// _query_params: &Multimap, +/// _bucket_name: Option<&str>, +/// _object_name: Option<&str>, +/// _body: Option<&SegmentedBytes>, +/// extensions: &mut Extensions, +/// ) -> Result<(), Error> { +/// println!("[REQ] {} {}", method, url); +/// extensions.insert(Instant::now()); +/// Ok(()) +/// } +/// +/// async fn after_execute( +/// &self, +/// _method: &Method, +/// _url: &Url, +/// _region: &str, +/// _headers: &Multimap, +/// _query_params: &Multimap, +/// _bucket_name: Option<&str>, +/// _object_name: Option<&str>, +/// resp: &Result, +/// extensions: &mut Extensions, +/// ) { +/// let duration = extensions.get::() +/// .map(|start| start.elapsed()) +/// .unwrap_or_default(); +/// +/// match resp { +/// Ok(r) => println!("[RESP] {} in {:?}", r.status(), duration), +/// Err(e) => println!("[ERR] {} in {:?}", e, duration), +/// } +/// } +/// } +/// +/// # fn main() {} +/// ``` +#[async_trait::async_trait] +pub trait RequestHooks: Debug { + /// Hook name for logging. + fn name(&self) -> &'static str; + + /// Called before signing. Modify URL/headers here. Return error to abort. + async fn before_signing_mut( + &self, + _method: &Method, + _url: &mut Url, + _region: &str, + _headers: &mut Multimap, + _query_params: &Multimap, + _bucket_name: Option<&str>, + _object_name: Option<&str>, + _body: Option<&SegmentedBytes>, + _extensions: &mut Extensions, + ) -> Result<(), Error> { + Ok(()) + } + + /// Called after execution. For logging/telemetry. Errors don't fail the request. + async fn after_execute( + &self, + _method: &Method, + _url: &Url, + _region: &str, + _headers: &Multimap, + _query_params: &Multimap, + _bucket_name: Option<&str>, + _object_name: Option<&str>, + _resp: &Result, + _extensions: &mut Extensions, + ) { + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::s3::multimap_ext::MultimapExt; + + #[test] + fn test_hook_trait_has_default_implementations() { + #[derive(Debug)] + struct MinimalHook; + + #[async_trait::async_trait] + impl RequestHooks for MinimalHook { + fn name(&self) -> &'static str { + "minimal-hook" + } + } + + let hook = MinimalHook; + assert_eq!(hook.name(), "minimal-hook"); + } + + #[tokio::test] + async fn test_hook_can_modify_url() { + #[derive(Debug)] + struct UrlModifyingHook; + + #[async_trait::async_trait] + impl RequestHooks for UrlModifyingHook { + fn name(&self) -> &'static str { + "url-modifier" + } + + async fn before_signing_mut( + &self, + _method: &Method, + url: &mut Url, + _region: &str, + _headers: &mut Multimap, + _query_params: &Multimap, + _bucket_name: Option<&str>, + _object_name: Option<&str>, + _body: Option<&SegmentedBytes>, + _extensions: &mut Extensions, + ) -> Result<(), Error> { + url.host = "modified-host.example.com".to_string(); + url.port = 9000; + Ok(()) + } + } + + let hook = UrlModifyingHook; + let mut url = Url { + https: true, + host: "original-host.example.com".to_string(), + port: 443, + path: "/bucket/object".to_string(), + query: Multimap::new(), + }; + let mut headers = Multimap::new(); + let query_params = Multimap::new(); + let mut extensions = Extensions::default(); + + let result = hook + .before_signing_mut( + &Method::GET, + &mut url, + "us-east-1", + &mut headers, + &query_params, + Some("bucket"), + Some("object"), + None, + &mut extensions, + ) + .await; + + assert!(result.is_ok()); + assert_eq!(url.host, "modified-host.example.com"); + assert_eq!(url.port, 9000); + } + + #[tokio::test] + async fn test_hook_can_modify_headers() { + #[derive(Debug)] + struct HeaderModifyingHook; + + #[async_trait::async_trait] + impl RequestHooks for HeaderModifyingHook { + fn name(&self) -> &'static str { + "header-modifier" + } + + async fn before_signing_mut( + &self, + _method: &Method, + _url: &mut Url, + _region: &str, + headers: &mut Multimap, + _query_params: &Multimap, + _bucket_name: Option<&str>, + _object_name: Option<&str>, + _body: Option<&SegmentedBytes>, + _extensions: &mut Extensions, + ) -> Result<(), Error> { + headers.add("X-Custom-Header", "custom-value"); + Ok(()) + } + } + + let hook = HeaderModifyingHook; + let mut url = Url::default(); + let mut headers = Multimap::new(); + let query_params = Multimap::new(); + let mut extensions = Extensions::default(); + + let result = hook + .before_signing_mut( + &Method::GET, + &mut url, + "us-east-1", + &mut headers, + &query_params, + None, + None, + None, + &mut extensions, + ) + .await; + + assert!(result.is_ok()); + assert!(headers.contains_key("X-Custom-Header")); + assert_eq!( + headers.get("X-Custom-Header"), + Some(&"custom-value".to_string()) + ); + } + + #[tokio::test] + async fn test_hook_can_use_extensions() { + #[derive(Debug)] + struct ExtensionWritingHook; + + #[async_trait::async_trait] + impl RequestHooks for ExtensionWritingHook { + fn name(&self) -> &'static str { + "extension-writer" + } + + async fn before_signing_mut( + &self, + _method: &Method, + _url: &mut Url, + _region: &str, + _headers: &mut Multimap, + _query_params: &Multimap, + _bucket_name: Option<&str>, + _object_name: Option<&str>, + _body: Option<&SegmentedBytes>, + extensions: &mut Extensions, + ) -> Result<(), Error> { + extensions.insert("test-data".to_string()); + extensions.insert(42u32); + Ok(()) + } + } + + let hook = ExtensionWritingHook; + let mut url = Url::default(); + let mut headers = Multimap::new(); + let query_params = Multimap::new(); + let mut extensions = Extensions::default(); + + let result = hook + .before_signing_mut( + &Method::GET, + &mut url, + "us-east-1", + &mut headers, + &query_params, + None, + None, + None, + &mut extensions, + ) + .await; + + assert!(result.is_ok()); + assert_eq!(extensions.get::(), Some(&"test-data".to_string())); + assert_eq!(extensions.get::(), Some(&42u32)); + } + + #[tokio::test] + async fn test_hook_can_return_error() { + use crate::s3::error::ValidationErr; + + #[derive(Debug)] + struct ErrorReturningHook; + + #[async_trait::async_trait] + impl RequestHooks for ErrorReturningHook { + fn name(&self) -> &'static str { + "error-hook" + } + + async fn before_signing_mut( + &self, + _method: &Method, + _url: &mut Url, + _region: &str, + _headers: &mut Multimap, + _query_params: &Multimap, + bucket_name: Option<&str>, + _object_name: Option<&str>, + _body: Option<&SegmentedBytes>, + _extensions: &mut Extensions, + ) -> Result<(), Error> { + if bucket_name == Some("forbidden-bucket") { + return Err(Error::Validation(ValidationErr::InvalidBucketName { + name: "forbidden-bucket".to_string(), + reason: "Bucket access denied by hook".to_string(), + })); + } + Ok(()) + } + } + + let hook = ErrorReturningHook; + let mut url = Url::default(); + let mut headers = Multimap::new(); + let query_params = Multimap::new(); + let mut extensions = Extensions::default(); + + let result = hook + .before_signing_mut( + &Method::GET, + &mut url, + "us-east-1", + &mut headers, + &query_params, + Some("forbidden-bucket"), + None, + None, + &mut extensions, + ) + .await; + + assert!(result.is_err()); + match result { + Err(Error::Validation(ValidationErr::InvalidBucketName { name, reason })) => { + assert_eq!(name, "forbidden-bucket"); + assert!(reason.contains("denied by hook")); + } + _ => panic!("Expected InvalidBucketName error"), + } + } + + #[tokio::test] + async fn test_hook_default_after_execute() { + #[derive(Debug)] + struct NoOpHook; + + #[async_trait::async_trait] + impl RequestHooks for NoOpHook { + fn name(&self) -> &'static str { + "noop-hook" + } + } + + let hook = NoOpHook; + let mut url = Url::default(); + let mut headers = Multimap::new(); + let query_params = Multimap::new(); + let mut extensions = Extensions::default(); + + let result = hook + .before_signing_mut( + &Method::GET, + &mut url, + "us-east-1", + &mut headers, + &query_params, + None, + None, + None, + &mut extensions, + ) + .await; + + assert!(result.is_ok()); + } + + #[test] + fn test_debug_logging_format() { + // Test the debug logging formatting logic without async complexity + let method = Method::PUT; + let url = Url { + https: true, + host: "minio.example.com".to_string(), + port: 443, + path: "/bucket/object".to_string(), + query: Multimap::new(), + }; + let mut headers = Multimap::new(); + headers.add("Content-Type", "application/json"); + headers.add("Authorization", "AWS4-HMAC-SHA256 Credential=..."); + headers.add("x-amz-date", "20240101T000000Z"); + + let mut header_strings: Vec = headers + .iter_all() + .map(|(k, v)| format!("{}: {}", k, v.join(","))) + .collect(); + header_strings.sort(); + + let debug_str = format!( + "S3 request: {} url={}; headers={}", + method, + url, + header_strings.join("; ") + ); + + let truncated = if debug_str.len() > 1000 { + format!("{}...", &debug_str[..997]) + } else { + debug_str.clone() + }; + + assert!(debug_str.contains("S3 request:")); + assert!(debug_str.contains("PUT")); + assert!(debug_str.contains("minio.example.com")); + assert!(debug_str.contains("Content-Type")); + assert!(debug_str.contains("Authorization")); + assert!(debug_str.contains("x-amz-date")); + assert_eq!(truncated, debug_str); + } +} diff --git a/src/s3/error.rs b/src/s3/error.rs index d9f6cb7..eb3b797 100644 --- a/src/s3/error.rs +++ b/src/s3/error.rs @@ -236,6 +236,12 @@ pub enum ValidationErr { #[error("Content length is unknown")] ContentLengthUnknown, + + #[error("{name} interceptor failed: {source}")] + Hook { + source: Box, + name: String, + }, } impl From for ValidationErr { diff --git a/src/s3/object_content.rs b/src/s3/object_content.rs index e7e428c..15383c7 100644 --- a/src/s3/object_content.rs +++ b/src/s3/object_content.rs @@ -253,7 +253,7 @@ impl ObjectContent { } } -pub(crate) struct ContentStream { +pub struct ContentStream { r: Pin> + Send>>, extra: Option, size: Size,