feat: add request hooks infrastructure (#188)

Adds RequestHooks trait to enable intercepting and modifying S3 API
  requests at key points in the request lifecycle. This enables implementing
  cross-cutting concerns like load balancing, telemetry, and debug logging
  without modifying the core request handling logic.

---------

Co-authored-by: Tobias Pütz <tobias@minio.io>
This commit is contained in:
Henk-Jan Lebbink 2025-11-10 14:39:12 +01:00 committed by GitHub
parent 1ee21b7591
commit f4b4c9086d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1690 additions and 89 deletions

View File

@ -34,14 +34,14 @@ All source files that haven't been generated MUST include the following copyrigh
### Comments ### Comments
- **NO redundant comments** - Code should be self-documenting - **NO redundant comments** - Code should be self-documenting
- Avoid obvious comments like `// Set x to 5` for `x := 5` - Avoid obvious comments like `// Set x to 5` for `let x = 5;`
- Only add comments when they explain WHY, not WHAT - Only add comments when they explain WHY, not WHAT
- Document complex algorithms or non-obvious business logic - Document complex algorithms or non-obvious business logic
## Critical Code Patterns ## Critical Code Patterns
### Builder Pattern ### Builder Pattern
All S3 API requests MUST use the builder pattern, with the following documentation but then for the appropriate API All S3 API requests MUST use the builder pattern, with documentation similar to the following example (adapted for each specific API)
```rust ```rust
/// Argument builder for the [`AppendObject`](https://docs.aws.amazon.com/AmazonS3/latest/userguide/directory-buckets-objects-append.html) S3 API operation. /// Argument builder for the [`AppendObject`](https://docs.aws.amazon.com/AmazonS3/latest/userguide/directory-buckets-objects-append.html) S3 API operation.
@ -73,6 +73,33 @@ impl Client {
} }
``` ```
### Rust-Specific Best Practices
1. **Ownership and Borrowing**
- Prefer `&str` over `&String` in function parameters
- Use `AsRef<str>` or `Into<String>` for flexible string parameters
- Return owned types from functions unless lifetime annotations are clear
2. **Type Safety**
- Use `#[must_use]` attribute for functions returning important values
- Prefer strong typing over primitive obsession
- Use newtypes for domain-specific values
3. **Unsafe Code**
- Avoid `unsafe` code unless absolutely necessary
- Document all safety invariants when `unsafe` is required
- Isolate `unsafe` blocks and keep them minimal
4. **Performance**
- Use `Cow<'_, str>` to avoid unnecessary allocations
- Prefer iterators over collecting into intermediate vectors
- Use `Box<dyn Trait>` sparingly; prefer generics when possible
5. **Async Patterns**
- Use `tokio::select!` for concurrent operations
- Avoid blocking operations in async contexts
- Use `async-trait` for async trait methods
## Code Quality Principles ## Code Quality Principles
### Why Code Quality Standards Are Mandatory ### Why Code Quality Standards Are Mandatory
@ -126,8 +153,9 @@ Complex distributed systems code must remain **human-readable**:
### Variables ### Variables
- Use meaningful variable names that reflect business concepts - Use meaningful variable names that reflect business concepts
- Variable names should reflect usage frequency: frequent variables can be shorter - Variable names should reflect usage frequency: frequent variables can be shorter
- Constants should follow Rust patterns - Constants should use SCREAMING_SNAKE_CASE (e.g., `MAX_RETRIES`, `DEFAULT_TIMEOUT`)
- Global variables should be clearly identified and documented for their system-wide purpose - Static variables should be clearly identified with proper safety documentation
- Prefer `const` over `static` when possible for compile-time constants
### Developer Documentation ### Developer Documentation
@ -179,7 +207,7 @@ Claude will periodically analyze the codebase and suggest:
Before any code changes: Before any code changes:
1. ✅ Run `cargo fmt --all` to check and fix code formatting 1. ✅ Run `cargo fmt --all` to check and fix code formatting
2. ✅ Run `cargo test` to ensure all tests pass 2. ✅ Run `cargo test` to ensure all tests pass
3. ✅ Run `cargo clippy` to check for common mistakes 3. ✅ Run `cargo clippy --all-targets --all-features --workspace -- -D warnings` to check for common mistakes and ensure no warnings
4. ✅ Ensure new code has appropriate test coverage 4. ✅ Ensure new code has appropriate test coverage
5. ✅ Verify no redundant comments are added 5. ✅ Verify no redundant comments are added
@ -222,6 +250,6 @@ fn operation() -> Result<Response, Error> {
- **Fix formatting**: `cargo fmt --all` - **Fix formatting**: `cargo fmt --all`
- **Run tests**: `cargo test` - **Run tests**: `cargo test`
- **Run specific test**: `cargo test test_name` - **Run specific test**: `cargo test test_name`
- **Check code**: `cargo clippy` - **Check code**: `cargo clippy --all-targets --all-features --workspace -- -D warnings`
- **Build project**: `cargo build --release` - **Build project**: `cargo build --release`
- **Generate docs**: `cargo doc --open` - **Generate docs**: `cargo doc --open`

View File

@ -56,10 +56,10 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sha2 = { version = "0.10", optional = true } sha2 = { version = "0.10", optional = true }
urlencoding = "2.1" urlencoding = "2.1"
xmltree = "0.11" xmltree = "0.12"
http = "1.3" http = "1.3"
thiserror = "2.0" thiserror = "2.0"
typed-builder = "0.22" typed-builder = "0.23"
[dev-dependencies] [dev-dependencies]
minio-common = { path = "./common" } minio-common = { path = "./common" }
@ -87,6 +87,9 @@ name = "object_prompt"
[[example]] [[example]]
name = "append_object" name = "append_object"
[[example]]
name = "load_balancing_with_hooks"
[[bench]] [[bench]]
name = "s3-api" name = "s3-api"
path = "benches/s3/api_benchmarks.rs" path = "benches/s3/api_benchmarks.rs"

View File

@ -144,7 +144,7 @@ impl TestContext {
/// - `CleanupGuard` - A guard that automatically deletes the bucket when dropped. /// - `CleanupGuard` - A guard that automatically deletes the bucket when dropped.
/// ///
/// # Example /// # Example
/// ```ignore /// ```no_run
/// let (bucket_name, guard) = client.create_bucket_helper().await; /// let (bucket_name, guard) = client.create_bucket_helper().await;
/// println!("Created temporary bucket: {}", bucket_name); /// println!("Created temporary bucket: {}", bucket_name);
/// // The bucket will be removed when `guard` is dropped. /// // The bucket will be removed when `guard` is dropped.

View File

@ -0,0 +1,235 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2024 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Example demonstrating how to use RequestHooks for debug logging.
//!
//! This example shows:
//! - Creating a custom debug logging hook
//! - Attaching the hook to the MinIO client
//! - Automatic logging of all S3 API requests with headers and response status
//! - Using both `before_signing_mut` and `after_execute` hooks
//!
//! Run with default values (test-bucket / test-object.txt, verbose mode enabled):
//! ```
//! cargo run --example debug_logging_hook
//! ```
//!
//! Run with custom bucket and object:
//! ```
//! cargo run --example debug_logging_hook -- mybucket myobject
//! ```
//!
//! Disable verbose output:
//! ```
//! cargo run --example debug_logging_hook -- --no-verbose
//! ```
use clap::{ArgAction, Parser};
use futures_util::StreamExt;
use minio::s3::builders::ObjectContent;
use minio::s3::client::hooks::{Extensions, RequestHooks};
use minio::s3::client::{Method, Response};
use minio::s3::creds::StaticProvider;
use minio::s3::error::Error;
use minio::s3::http::Url;
use minio::s3::multimap_ext::Multimap;
use minio::s3::response::BucketExistsResponse;
use minio::s3::segmented_bytes::SegmentedBytes;
use minio::s3::types::{S3Api, ToStream};
use minio::s3::{MinioClient, MinioClientBuilder};
use std::sync::Arc;
/// Debug logging hook that prints detailed information about each S3 request.
#[derive(Debug)]
struct DebugLoggingHook {
/// Enable verbose output including all headers
verbose: bool,
}
impl DebugLoggingHook {
fn new(verbose: bool) -> Self {
Self { verbose }
}
}
#[async_trait::async_trait]
impl RequestHooks for DebugLoggingHook {
fn name(&self) -> &'static str {
"debug-logger"
}
async fn before_signing_mut(
&self,
method: &Method,
url: &mut Url,
_region: &str,
_headers: &mut Multimap,
_query_params: &Multimap,
bucket_name: Option<&str>,
object_name: Option<&str>,
_body: Option<&SegmentedBytes>,
_extensions: &mut Extensions,
) -> Result<(), Error> {
if self.verbose {
let bucket_obj = match (bucket_name, object_name) {
(Some(b), Some(o)) => format!("{b}/{o}"),
(Some(b), None) => b.to_string(),
_ => url.to_string(),
};
println!("→ Preparing {method} request for {bucket_obj}");
}
Ok(())
}
async fn after_execute(
&self,
method: &Method,
url: &Url,
_region: &str,
headers: &Multimap,
_query_params: &Multimap,
bucket_name: Option<&str>,
object_name: Option<&str>,
resp: &Result<Response, reqwest::Error>,
_extensions: &mut Extensions,
) {
// Format the basic request info
let bucket_obj = match (bucket_name, object_name) {
(Some(b), Some(o)) => format!("{b}/{o}"),
(Some(b), None) => b.to_string(),
_ => url.to_string(),
};
// Format response status
let status = match resp {
Ok(response) => format!("{}", response.status()),
Err(err) => format!("✗ Error: {err}"),
};
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!("S3 Request: {method} {bucket_obj}");
println!("URL: {url}");
println!("Status: {status}");
if self.verbose {
// Print headers alphabetically
let mut header_strings: Vec<String> = headers
.iter_all()
.map(|(k, v)| format!("{}: {}", k, v.join(",")))
.collect();
header_strings.sort();
println!("\nRequest Headers:");
for header in header_strings {
println!(" {header}");
}
}
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
}
}
/// Example demonstrating debug logging with hooks
#[derive(Parser)]
struct Cli {
/// Bucket to use for the example
#[arg(default_value = "test-bucket")]
bucket: String,
/// Object to upload
#[arg(default_value = "test-object.txt")]
object: String,
/// Disable verbose output (verbose is enabled by default, use --no-verbose to disable)
#[arg(long = "no-verbose", action = ArgAction::SetFalse, default_value_t = true)]
verbose: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
env_logger::init();
let args = Cli::parse();
println!("\n🔧 MinIO Debug Logging Hook Example\n");
println!("This example demonstrates how hooks can be used for debugging S3 requests.");
println!(
"We'll perform a few operations on bucket '{}' with debug logging enabled.\n",
args.bucket
);
// Create the debug logging hook
let debug_hook = Arc::new(DebugLoggingHook::new(args.verbose));
// Create MinIO client with the debug logging hook attached
let static_provider = StaticProvider::new(
"Q3AM3UQ867SPQQA43P2F",
"zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
None,
);
let client: MinioClient = MinioClientBuilder::new("https://play.min.io".parse()?)
.provider(Some(static_provider))
.hook(debug_hook) // Attach the debug logging hook
.build()?;
println!("✓ Created MinIO client with debug logging hook\n");
// Operation 1: Check if bucket exists
println!("📋 Checking if bucket exists...");
let resp: BucketExistsResponse = client.bucket_exists(&args.bucket).build().send().await?;
// Operation 2: Create bucket if it doesn't exist
if !resp.exists() {
println!("\n📋 Creating bucket...");
client.create_bucket(&args.bucket).build().send().await?;
} else {
println!("\n✓ Bucket already exists");
}
// Operation 3: Upload a small object
println!("\n📋 Uploading object...");
let content = b"Hello from MinIO Rust SDK with debug logging!";
let object_content: ObjectContent = content.to_vec().into();
client
.put_object_content(&args.bucket, &args.object, object_content)
.build()
.send()
.await?;
// Operation 4: List objects in the bucket
println!("\n📋 Listing objects in bucket...");
let mut list_stream = client
.list_objects(&args.bucket)
.recursive(false)
.build()
.to_stream()
.await;
let mut total_objects = 0;
while let Some(result) = list_stream.next().await {
match result {
Ok(resp) => {
total_objects += resp.contents.len();
}
Err(e) => {
eprintln!("Error listing objects: {e}");
}
}
}
println!("\n✓ Found {total_objects} objects in bucket");
println!("\n🎉 All operations completed successfully with debug logging enabled!\n");
println!("💡 Tip: Run with --no-verbose to disable detailed output\n");
Ok(())
}

View File

@ -0,0 +1,702 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2024 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! # Load Balancing with Request Hooks Example
//!
//! This example demonstrates how to implement client-side load balancing for MinIO
//! using the RequestHooks infrastructure. It shows multiple load balancing strategies
//! and how to monitor their effectiveness.
//!
//! ## Key Concepts
//!
//! 1. **Request Hooks**: Intercept and modify requests before they're signed and sent
//! 2. **Load Balancing**: Distribute requests across multiple MinIO nodes
//! 3. **Health Checking**: Track node availability and response times
//! 4. **Telemetry**: Monitor load distribution and performance
//! 5. **Redirect Headers**: When URL is modified, automatic headers are added:
//! - `x-minio-redirect-from`: Original target URL
//! - `x-minio-redirect-to`: New destination URL after load balancing
//!
//! ## Usage
//!
//! ```bash
//! # Set up your MinIO cluster nodes
//! export MINIO_NODES="node1.minio.local:9000,node2.minio.local:9000,node3.minio.local:9000"
//! export MINIO_ROOT_USER="minioadmin"
//! export MINIO_ROOT_PASSWORD="minioadmin"
//!
//! # Run with round-robin strategy (default)
//! cargo run --example load_balancing_with_hooks
//!
//! # Run with least-connections strategy
//! cargo run --example load_balancing_with_hooks -- --strategy least-connections
//!
//! # Run with weighted round-robin
//! cargo run --example load_balancing_with_hooks -- --strategy weighted
//! ```
use clap::{Parser, ValueEnum};
use http::{Extensions, Method};
use minio::s3::client::{MinioClientBuilder, RequestHooks};
use minio::s3::creds::StaticProvider;
use minio::s3::error::Error;
use minio::s3::http::{BaseUrl, Url};
use minio::s3::multimap_ext::Multimap;
use minio::s3::segmented_bytes::SegmentedBytes;
use minio::s3::types::{S3Api, ToStream};
use reqwest::Response;
use std::env;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
/// Command-line arguments for the load balancing example
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Load balancing strategy to use
#[arg(short, long, value_enum, default_value = "round-robin")]
strategy: LoadBalanceStrategy,
/// Number of requests to make for testing
#[arg(short = 'n', long, default_value = "100")]
requests: usize,
/// Enable verbose debug output
#[arg(short, long)]
verbose: bool,
}
/// Available load balancing strategies
#[derive(Clone, Copy, Debug, ValueEnum)]
enum LoadBalanceStrategy {
/// Simple round-robin: cycles through nodes sequentially
RoundRobin,
/// Least connections: routes to the node with fewest active connections
LeastConnections,
/// Weighted round-robin: distributes based on node weights/capacity
Weighted,
/// Random selection: randomly picks a node for each request
Random,
}
/// Represents a MinIO node in the cluster with its health and performance metrics
#[derive(Debug, Clone)]
struct Node {
/// The hostname or IP address of the node
host: String,
/// The port number (typically 9000 for MinIO)
port: u16,
/// Whether to use HTTPS for this node
https: bool,
/// Current number of active connections to this node
active_connections: Arc<AtomicUsize>,
/// Total number of requests sent to this node
total_requests: Arc<AtomicU64>,
/// Total response time in milliseconds for all requests
total_response_time_ms: Arc<AtomicU64>,
/// Number of failed requests to this node
failed_requests: Arc<AtomicU64>,
/// Weight for weighted round-robin (higher = more traffic)
weight: u32,
/// Whether the node is currently healthy
is_healthy: Arc<RwLock<bool>>,
/// Last time the node was checked for health
last_health_check: Arc<RwLock<Instant>>,
}
impl Node {
/// Creates a new node from a host:port string
///
/// # Arguments
/// * `host_port` - String in format "hostname:port" or just "hostname" (defaults to port 9000)
/// * `https` - Whether to use HTTPS for this node
/// * `weight` - Weight for weighted round-robin (default 1)
fn new(host_port: &str, https: bool, weight: u32) -> Self {
let parts: Vec<&str> = host_port.split(':').collect();
let (host, port) = if parts.len() == 2 {
(parts[0].to_string(), parts[1].parse().unwrap_or(9000))
} else {
(host_port.to_string(), 9000)
};
Self {
host,
port,
https,
active_connections: Arc::new(AtomicUsize::new(0)),
total_requests: Arc::new(AtomicU64::new(0)),
total_response_time_ms: Arc::new(AtomicU64::new(0)),
failed_requests: Arc::new(AtomicU64::new(0)),
weight,
is_healthy: Arc::new(RwLock::new(true)),
last_health_check: Arc::new(RwLock::new(Instant::now())),
}
}
/// Returns the current number of active connections
fn get_active_connections(&self) -> usize {
self.active_connections.load(Ordering::Relaxed)
}
/// Returns whether the node is currently considered healthy
fn is_healthy(&self) -> bool {
*self.is_healthy.read().unwrap()
}
/// Calculates the average response time in milliseconds
fn average_response_time_ms(&self) -> f64 {
let total_requests = self.total_requests.load(Ordering::Relaxed);
if total_requests == 0 {
0.0
} else {
self.total_response_time_ms.load(Ordering::Relaxed) as f64 / total_requests as f64
}
}
/// Updates the node's health status based on recent failures
fn update_health_status(&self) {
let total = self.total_requests.load(Ordering::Relaxed);
let failed = self.failed_requests.load(Ordering::Relaxed);
// Mark unhealthy if more than 50% of recent requests failed
let is_healthy = if total > 10 {
(failed as f64 / total as f64) < 0.5
} else {
true // Not enough data to determine
};
*self.is_healthy.write().unwrap() = is_healthy;
*self.last_health_check.write().unwrap() = Instant::now();
}
}
/// Main load balancer hook that implements various load balancing strategies
///
/// This hook intercepts all S3 requests and redirects them to different nodes
/// based on the selected strategy. It also tracks metrics for monitoring.
#[derive(Debug, Clone)]
struct LoadBalancerHook {
/// List of available MinIO nodes
nodes: Arc<Vec<Node>>,
/// Selected load balancing strategy
strategy: LoadBalanceStrategy,
/// Counter for round-robin strategy
round_robin_counter: Arc<AtomicUsize>,
/// Whether to print verbose debug information
verbose: bool,
}
impl LoadBalancerHook {
/// Creates a new load balancer hook
///
/// # Arguments
/// * `nodes` - List of MinIO nodes to balance between
/// * `strategy` - Load balancing strategy to use
/// * `verbose` - Whether to print debug information
fn new(nodes: Vec<Node>, strategy: LoadBalanceStrategy, verbose: bool) -> Self {
if nodes.is_empty() {
panic!("At least one node must be provided");
}
Self {
nodes: Arc::new(nodes),
strategy,
round_robin_counter: Arc::new(AtomicUsize::new(0)),
verbose,
}
}
/// Selects the next node based on the configured strategy
///
/// # Returns
/// The selected node, or None if no healthy nodes are available
fn select_node(&self) -> Option<Node> {
// Filter to only healthy nodes
let healthy_nodes: Vec<Node> = self
.nodes
.iter()
.filter(|n| n.is_healthy())
.cloned()
.collect();
if healthy_nodes.is_empty() {
// If no healthy nodes, try all nodes
if self.verbose {
println!("WARNING: No healthy nodes available, using all nodes");
}
return self.select_from_nodes(&self.nodes);
}
self.select_from_nodes(&healthy_nodes)
}
/// Internal method to select from a given set of nodes
fn select_from_nodes(&self, nodes: &[Node]) -> Option<Node> {
if nodes.is_empty() {
return None;
}
match self.strategy {
LoadBalanceStrategy::RoundRobin => {
// Simple round-robin: cycle through nodes sequentially
let index = self.round_robin_counter.fetch_add(1, Ordering::SeqCst) % nodes.len();
Some(nodes[index].clone())
}
LoadBalanceStrategy::LeastConnections => {
// Select the node with the fewest active connections
nodes
.iter()
.min_by_key(|n| n.get_active_connections())
.cloned()
}
LoadBalanceStrategy::Weighted => {
// Weighted round-robin: distribute based on node weights
// Build a weighted list where nodes appear multiple times based on weight
let mut weighted_nodes = Vec::new();
for node in nodes {
for _ in 0..node.weight {
weighted_nodes.push(node.clone());
}
}
if weighted_nodes.is_empty() {
Some(nodes[0].clone())
} else {
let index = self.round_robin_counter.fetch_add(1, Ordering::SeqCst)
% weighted_nodes.len();
Some(weighted_nodes[index].clone())
}
}
LoadBalanceStrategy::Random => {
// Random selection using a simple pseudo-random approach
let seed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as usize;
let index = seed % nodes.len();
Some(nodes[index].clone())
}
}
}
/// Prints current statistics for all nodes
fn print_stats(&self) {
println!("\n=== Load Balancer Statistics ===");
println!("Strategy: {:?}", self.strategy);
println!("\nNode Statistics:");
for (i, node) in self.nodes.iter().enumerate() {
let total = node.total_requests.load(Ordering::Relaxed);
let failed = node.failed_requests.load(Ordering::Relaxed);
let active = node.get_active_connections();
let avg_time = node.average_response_time_ms();
let health_status = if node.is_healthy() {
"HEALTHY"
} else {
"UNHEALTHY"
};
println!("\nNode {}: {}:{}", i + 1, node.host, node.port);
println!(" Status: {}", health_status);
println!(" Weight: {}", node.weight);
println!(" Total Requests: {}", total);
println!(" Failed Requests: {}", failed);
println!(" Active Connections: {}", active);
println!(" Avg Response Time: {:.2} ms", avg_time);
if total > 0 {
let success_rate = ((total - failed) as f64 / total as f64) * 100.0;
println!(" Success Rate: {:.1}%", success_rate);
}
}
println!("\n================================");
}
}
#[async_trait::async_trait]
impl RequestHooks for LoadBalancerHook {
fn name(&self) -> &'static str {
"load-balancer"
}
/// Called before the request is signed - this is where we redirect to selected node
async fn before_signing_mut(
&self,
method: &Method,
url: &mut Url,
_region: &str,
_headers: &mut Multimap,
_query_params: &Multimap,
bucket_name: Option<&str>,
object_name: Option<&str>,
_body: Option<&SegmentedBytes>,
extensions: &mut Extensions,
) -> Result<(), Error> {
// Select the target node
let node = self
.select_node()
.ok_or_else(|| Error::Validation(minio::s3::error::ValidationErr::MissingBucketName))?;
// Store the selected node and start time in extensions for later use
extensions.insert(node.clone());
extensions.insert(Instant::now());
// Increment active connections for this node
node.active_connections.fetch_add(1, Ordering::SeqCst);
node.total_requests.fetch_add(1, Ordering::SeqCst);
// Update the URL to point to the selected node
let original_host = url.host.clone();
url.host = node.host.clone();
url.port = node.port;
url.https = node.https;
// Note: When we modify the URL here, the MinIO client will automatically add
// x-minio-redirect-from and x-minio-redirect-to headers to track the redirection
// for server-side telemetry and debugging.
if self.verbose {
println!(
"[{}] Routing {} request to {}:{} (was: {})",
chrono::Local::now().format("%H:%M:%S%.3f"),
method,
node.host,
node.port,
original_host
);
if let Some(bucket) = bucket_name {
print!(" - Bucket: {}", bucket);
if let Some(object) = object_name {
print!(", Object: {}", object);
}
println!();
}
println!(
" Active connections on this node: {}",
node.get_active_connections()
);
}
Ok(())
}
/// Called after the request completes - update metrics and health status
async fn after_execute(
&self,
method: &Method,
_url: &Url,
_region: &str,
_headers: &Multimap,
_query_params: &Multimap,
_bucket_name: Option<&str>,
_object_name: Option<&str>,
resp: &Result<Response, reqwest::Error>,
extensions: &mut Extensions,
) {
// Retrieve the node and start time from extensions
if let Some(node) = extensions.get::<Node>() {
// Decrement active connections
node.active_connections.fetch_sub(1, Ordering::SeqCst);
// Calculate response time
if let Some(start_time) = extensions.get::<Instant>() {
let duration = start_time.elapsed();
let response_time_ms = duration.as_millis() as u64;
node.total_response_time_ms
.fetch_add(response_time_ms, Ordering::SeqCst);
if self.verbose {
let status_str = match resp {
Ok(response) => format!("HTTP {}", response.status().as_u16()),
Err(err) => format!("Error: {}", err),
};
println!(
"[{}] Response from {}:{} - {} - {} - {:?}",
chrono::Local::now().format("%H:%M:%S%.3f"),
node.host,
node.port,
method,
status_str,
duration
);
}
}
// Track failures
if resp.is_err() || resp.as_ref().unwrap().status().is_server_error() {
node.failed_requests.fetch_add(1, Ordering::SeqCst);
// Update health status if too many failures
node.update_health_status();
if !node.is_healthy() && self.verbose {
println!(
"WARNING: Node {}:{} marked as UNHEALTHY due to high failure rate",
node.host, node.port
);
}
}
}
}
}
/// Example telemetry hook that works alongside the load balancer
///
/// This demonstrates how multiple hooks can work together to provide
/// comprehensive monitoring and logging.
#[derive(Debug)]
struct TelemetryHook {
request_count: Arc<AtomicU64>,
verbose: bool,
}
impl TelemetryHook {
fn new(verbose: bool) -> Self {
Self {
request_count: Arc::new(AtomicU64::new(0)),
verbose,
}
}
}
#[async_trait::async_trait]
impl RequestHooks for TelemetryHook {
fn name(&self) -> &'static str {
"telemetry"
}
async fn before_signing_mut(
&self,
_method: &Method,
_url: &mut Url,
_region: &str,
_headers: &mut Multimap,
_query_params: &Multimap,
_bucket_name: Option<&str>,
_object_name: Option<&str>,
_body: Option<&SegmentedBytes>,
_extensions: &mut Extensions,
) -> Result<(), Error> {
let count = self.request_count.fetch_add(1, Ordering::SeqCst) + 1;
if self.verbose && count.is_multiple_of(10) {
println!("📊 Total requests processed: {}", count);
}
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Parse command-line arguments
let args = Args::parse();
println!("🚀 MinIO Load Balancing Example");
println!("================================\n");
// Get MinIO nodes from environment or use defaults
let nodes_str = env::var("MINIO_NODES")
.unwrap_or_else(|_| "localhost:9000,localhost:9001,localhost:9002".to_string());
println!("Configuring nodes from: {}", nodes_str);
// Parse nodes and create Node instances
let mut nodes: Vec<Node> = Vec::new();
for (i, node_str) in nodes_str.split(',').enumerate() {
// For weighted strategy, give different weights to nodes
let weight = if matches!(args.strategy, LoadBalanceStrategy::Weighted) {
// Example: first node gets weight 3, second gets 2, others get 1
match i {
0 => 3,
1 => 2,
_ => 1,
}
} else {
1
};
let node = Node::new(node_str.trim(), false, weight);
println!(
" - Node {}: {}:{} (weight: {})",
i + 1,
node.host,
node.port,
weight
);
nodes.push(node);
}
if nodes.is_empty() {
eprintln!("❌ Error: No nodes configured. Set MINIO_NODES environment variable.");
std::process::exit(1);
}
println!("\nStrategy: {:?}", args.strategy);
println!("Requests to make: {}", args.requests);
println!("Verbose mode: {}\n", args.verbose);
// Get credentials from environment
let access_key = env::var("MINIO_ROOT_USER")
.or_else(|_| env::var("MINIO_ACCESS_KEY"))
.unwrap_or_else(|_| "minioadmin".to_string());
let secret_key = env::var("MINIO_ROOT_PASSWORD")
.or_else(|_| env::var("MINIO_SECRET_KEY"))
.unwrap_or_else(|_| "minioadmin".to_string());
// Create the load balancer hook
let load_balancer = LoadBalancerHook::new(nodes.clone(), args.strategy, args.verbose);
// Create the telemetry hook
let telemetry = TelemetryHook::new(args.verbose);
// Use the first node as the initial base URL (will be overridden by the hook)
let base_url = BaseUrl::from_str(&format!("http://{}:{}", nodes[0].host, nodes[0].port))?;
// Build the MinIO client with our hooks
println!("Building MinIO client with load balancing hooks...");
let client = MinioClientBuilder::new(base_url)
.provider(Some(StaticProvider::new(&access_key, &secret_key, None)))
.hook(Arc::new(load_balancer.clone()))
.hook(Arc::new(telemetry))
.build()?;
println!("✅ Client configured successfully\n");
// Create a test bucket name
let test_bucket = format!("load-balance-test-{}", chrono::Utc::now().timestamp());
println!("Creating test bucket: {}", test_bucket);
// Try to create the bucket (might fail if it exists, that's ok)
match client.create_bucket(&test_bucket).build().send().await {
Ok(_) => println!("✅ Bucket created successfully"),
Err(e) => {
// Check if it's because the bucket already exists
if e.to_string().contains("BucketAlreadyOwnedByYou")
|| e.to_string().contains("BucketAlreadyExists")
{
println!(" Bucket already exists, continuing...");
} else {
println!("⚠️ Could not create bucket: {}", e);
println!(" Continuing with tests anyway...");
}
}
}
println!("\nStarting load balanced requests...\n");
// Perform test requests
let start_time = Instant::now();
let mut success_count = 0;
let mut failure_count = 0;
for i in 0..args.requests {
// Mix of different operations to simulate real load
let operation = i % 4;
let result = match operation {
0 => {
// List buckets
client.list_buckets().build().send().await.map(|_| ())
}
1 => {
// Check if bucket exists
client
.bucket_exists(&test_bucket)
.build()
.send()
.await
.map(|_| ())
}
2 => {
// Stat a non-existent object (will fail but that's ok)
client
.stat_object(&test_bucket, format!("test-object-{}", i))
.build()
.send()
.await
.map(|_| ())
}
3 => {
// List objects in bucket (just check if we can start the stream)
drop(client.list_objects(&test_bucket).build().to_stream());
Ok(())
}
_ => unreachable!(),
};
match result {
Ok(_) => success_count += 1,
Err(_) => failure_count += 1,
}
// Small delay between requests to avoid overwhelming the servers
if i < args.requests - 1 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
// Print progress
if !args.verbose && (i + 1) % 10 == 0 {
print!(".");
use std::io::Write;
std::io::stdout().flush().unwrap();
}
}
if !args.verbose {
println!(); // New line after progress dots
}
let total_duration = start_time.elapsed();
println!("\n✅ Load testing complete!");
println!("\n=== Final Results ===");
println!("Total time: {:?}", total_duration);
println!("Total requests: {}", args.requests);
println!("Successful: {}", success_count);
println!("Failed: {}", failure_count);
println!(
"Success rate: {:.1}%",
(success_count as f64 / args.requests as f64) * 100.0
);
println!(
"Requests/sec: {:.2}",
args.requests as f64 / total_duration.as_secs_f64()
);
// Print detailed statistics from the load balancer
load_balancer.print_stats();
// Clean up: try to delete the test bucket
println!("\nCleaning up test bucket...");
match client.delete_bucket(&test_bucket).build().send().await {
Ok(_) => println!("✅ Test bucket deleted"),
Err(e) => println!(" Could not delete test bucket: {}", e),
}
println!("\n🎉 Example complete!");
Ok(())
}

View File

@ -1,4 +1,3 @@
[toolchain] [toolchain]
channel = "1.88.0" channel = "1.90.0"
components = ["clippy", "rustfmt"] components = ["clippy", "rustfmt"]
#targets = ["x86_64-unknown-linux-musl"]

View File

@ -643,11 +643,11 @@ impl PutObjectContent {
return Err(ValidationErr::TooManyParts(part_number as u64).into()); return Err(ValidationErr::TooManyParts(part_number as u64).into());
} }
if let Some(exp) = object_size.value() { if let Some(exp) = object_size.value()
if exp < total_read { && exp < total_read
{
return Err(ValidationErr::TooMuchData(exp).into()); return Err(ValidationErr::TooMuchData(exp).into());
} }
}
// Upload the part now. // Upload the part now.
let resp: UploadPartResponse = UploadPart { let resp: UploadPartResponse = UploadPart {
@ -686,15 +686,15 @@ impl PutObjectContent {
// Complete the multipart upload. // Complete the multipart upload.
let size = parts.iter().map(|p| p.size).sum(); let size = parts.iter().map(|p| p.size).sum();
if let Some(expected) = object_size.value() { if let Some(expected) = object_size.value()
if expected != size { && expected != size
{
return Err(ValidationErr::InsufficientData { return Err(ValidationErr::InsufficientData {
expected, expected,
got: size, got: size,
} }
.into()); .into());
} }
}
let resp: CompleteMultipartUploadResponse = CompleteMultipartUpload { let resp: CompleteMultipartUploadResponse = CompleteMultipartUpload {
client: self.client, client: self.client,

View File

@ -18,8 +18,10 @@
use bytes::Bytes; use bytes::Bytes;
use dashmap::DashMap; use dashmap::DashMap;
use http::HeaderMap; use http::HeaderMap;
use hyper::http::Method; pub use hyper::http::Method;
use reqwest::Body; use reqwest::Body;
pub use reqwest::Response;
use std::fmt::Debug;
use std::fs::File; use std::fs::File;
use std::io::prelude::*; use std::io::prelude::*;
use std::mem; use std::mem;
@ -28,12 +30,13 @@ use std::sync::{Arc, OnceLock};
use uuid::Uuid; use uuid::Uuid;
use crate::s3::builders::{BucketExists, ComposeSource}; use crate::s3::builders::{BucketExists, ComposeSource};
pub use crate::s3::client::hooks::RequestHooks;
use crate::s3::creds::Provider; use crate::s3::creds::Provider;
#[cfg(feature = "localhost")] #[cfg(feature = "localhost")]
use crate::s3::creds::StaticProvider; use crate::s3::creds::StaticProvider;
use crate::s3::error::{Error, IoError, NetworkError, S3ServerError, ValidationErr}; use crate::s3::error::{Error, IoError, NetworkError, S3ServerError, ValidationErr};
use crate::s3::header_constants::*; use crate::s3::header_constants::*;
use crate::s3::http::BaseUrl; use crate::s3::http::{BaseUrl, Url};
use crate::s3::minio_error_response::{MinioErrorCode, MinioErrorResponse}; use crate::s3::minio_error_response::{MinioErrorCode, MinioErrorResponse};
use crate::s3::multimap_ext::{Multimap, MultimapExt}; use crate::s3::multimap_ext::{Multimap, MultimapExt};
use crate::s3::response::a_response_traits::{HasEtagFromHeaders, HasS3Fields}; use crate::s3::response::a_response_traits::{HasEtagFromHeaders, HasS3Fields};
@ -72,6 +75,7 @@ mod get_object_tagging;
mod get_presigned_object_url; mod get_presigned_object_url;
mod get_presigned_post_form_data; mod get_presigned_post_form_data;
mod get_region; mod get_region;
pub mod hooks;
mod list_buckets; mod list_buckets;
mod list_objects; mod list_objects;
mod listen_bucket_notification; mod listen_bucket_notification;
@ -129,6 +133,7 @@ pub struct MinioClientBuilder {
base_url: BaseUrl, base_url: BaseUrl,
/// Set the credential provider. If not, set anonymous access is used. /// Set the credential provider. If not, set anonymous access is used.
provider: Option<Arc<dyn Provider + Send + Sync + 'static>>, provider: Option<Arc<dyn Provider + Send + Sync + 'static>>,
client_hooks: Vec<Arc<dyn RequestHooks + Send + Sync + 'static>>,
/// Set file for loading CAs certs to trust. This is in addition to the system trust store. The file must contain PEM encoded certificates. /// Set file for loading CAs certs to trust. This is in addition to the system trust store. The file must contain PEM encoded certificates.
ssl_cert_file: Option<PathBuf>, ssl_cert_file: Option<PathBuf>,
/// Set flag to ignore certificate check. This is insecure and should only be used for testing. /// Set flag to ignore certificate check. This is insecure and should only be used for testing.
@ -144,12 +149,20 @@ impl MinioClientBuilder {
Self { Self {
base_url, base_url,
provider: None, provider: None,
client_hooks: Vec::new(),
ssl_cert_file: None, ssl_cert_file: None,
ignore_cert_check: None, ignore_cert_check: None,
app_info: None, app_info: None,
} }
} }
/// Add a client hook to the builder. Hooks will be called after each other in
/// order they were added.
pub fn hook(mut self, hooks: Arc<dyn RequestHooks + Send + Sync + 'static>) -> Self {
self.client_hooks.push(hooks);
self
}
/// Set the credential provider. If not, set anonymous access is used. /// Set the credential provider. If not, set anonymous access is used.
pub fn provider<P: Provider + Send + Sync + 'static>(mut self, provider: Option<P>) -> Self { pub fn provider<P: Provider + Send + Sync + 'static>(mut self, provider: Option<P>) -> Self {
self.provider = provider.map(|p| Arc::new(p) as Arc<dyn Provider + Send + Sync + 'static>); self.provider = provider.map(|p| Arc::new(p) as Arc<dyn Provider + Send + Sync + 'static>);
@ -223,6 +236,7 @@ impl MinioClientBuilder {
shared: Arc::new(SharedClientItems { shared: Arc::new(SharedClientItems {
base_url: self.base_url, base_url: self.base_url,
provider: self.provider, provider: self.provider,
client_hooks: self.client_hooks,
region_map: Default::default(), region_map: Default::default(),
express: Default::default(), express: Default::default(),
}), }),
@ -441,15 +455,15 @@ impl MinioClient {
body: Option<Arc<SegmentedBytes>>, body: Option<Arc<SegmentedBytes>>,
retry: bool, retry: bool,
) -> Result<reqwest::Response, Error> { ) -> Result<reqwest::Response, Error> {
let url = self.shared.base_url.build_url( let mut url = self.shared.base_url.build_url(
method, method,
region, region,
query_params, query_params,
bucket_name, bucket_name,
object_name, object_name,
)?; )?;
let mut extensions = http::Extensions::default();
{
headers.add(HOST, url.host_header_value()); headers.add(HOST, url.host_header_value());
let sha256: String = match *method { let sha256: String = match *method {
Method::PUT | Method::POST => { Method::PUT | Method::POST => {
@ -472,6 +486,32 @@ impl MinioClient {
let date = utc_now(); let date = utc_now();
headers.add(X_AMZ_DATE, to_amz_date(date)); headers.add(X_AMZ_DATE, to_amz_date(date));
// Allow hooks to modify the request before signing (e.g., for client-side load balancing)
let url_before_hook = url.to_string();
self.run_before_signing_hooks(
method,
&mut url,
region,
headers,
query_params,
bucket_name,
object_name,
&body,
&mut extensions,
)
.await?;
// If a hook modified the URL (e.g., redirecting to a different MinIO node for load balancing),
// add headers to inform the server about the client-side redirection.
// This enables server-side telemetry, debugging, and load balancing metrics.
// x-minio-redirect-from: The original URL before hook modification
// x-minio-redirect-to: The actual endpoint where the request is being sent
if url.to_string() != url_before_hook {
headers.add("x-minio-redirect-from", &url_before_hook);
headers.add("x-minio-redirect-to", url.to_string());
}
if let Some(p) = &self.shared.provider { if let Some(p) = &self.shared.provider {
let creds = p.fetch(); let creds = p.fetch();
if creds.session_token.is_some() { if creds.session_token.is_some() {
@ -489,7 +529,7 @@ impl MinioClient {
date, date,
); );
} }
}
let mut req = self.http_client.request(method.clone(), url.to_string()); let mut req = self.http_client.request(method.clone(), url.to_string());
for (key, values) in headers.iter_all() { for (key, values) in headers.iter_all() {
@ -498,31 +538,9 @@ impl MinioClient {
} }
} }
if false {
let mut header_strings: Vec<String> = headers
.iter_all()
.map(|(k, v)| format!("{}: {}", k, v.join(",")))
.collect();
// Sort headers alphabetically by name
header_strings.sort();
let debug_str = format!(
"S3 request: {method} url={:?}; headers={:?}; body={body:?}",
url.path,
header_strings.join("; ")
);
let truncated = if debug_str.len() > 1000 {
format!("{}...", &debug_str[..997])
} else {
debug_str
};
println!("{truncated}");
}
if (*method == Method::PUT) || (*method == Method::POST) { if (*method == Method::PUT) || (*method == Method::POST) {
//TODO: why-oh-why first collect into a vector and then iterate to a stream? //TODO: why-oh-why first collect into a vector and then iterate to a stream?
let bytes_vec: Vec<Bytes> = match body { let bytes_vec: Vec<Bytes> = match body.as_ref() {
Some(v) => v.iter().collect(), Some(v) => v.iter().collect(),
None => Vec::new(), None => Vec::new(),
}; };
@ -532,7 +550,22 @@ impl MinioClient {
req = req.body(Body::wrap_stream(stream)); req = req.body(Body::wrap_stream(stream));
} }
let resp: reqwest::Response = req.send().await.map_err(ValidationErr::from)?; //TODO request error handled by network error layer let resp = req.send().await;
self.run_after_execute_hooks(
method,
&url,
region,
headers,
query_params,
bucket_name,
object_name,
&resp,
&mut extensions,
)
.await;
let resp = resp.map_err(ValidationErr::from)?;
if resp.status().is_success() { if resp.status().is_success() {
return Ok(resp); return Ok(resp);
} }
@ -612,6 +645,64 @@ impl MinioClient {
.await .await
} }
async fn run_after_execute_hooks(
&self,
method: &Method,
url: &Url,
region: &str,
headers: &mut Multimap,
query_params: &Multimap,
bucket_name: Option<&str>,
object_name: Option<&str>,
resp: &Result<Response, reqwest::Error>,
extensions: &mut http::Extensions,
) {
for hook in self.shared.client_hooks.iter() {
hook.after_execute(
method,
url,
region,
headers,
query_params,
bucket_name,
object_name,
resp,
extensions,
)
.await;
}
}
async fn run_before_signing_hooks(
&self,
method: &Method,
url: &mut Url,
region: &str,
headers: &mut Multimap,
query_params: &Multimap,
bucket_name: Option<&str>,
object_name: Option<&str>,
body: &Option<Arc<SegmentedBytes>>,
extensions: &mut http::Extensions,
) -> Result<(), Error> {
for hook in self.shared.client_hooks.iter() {
hook.before_signing_mut(
method,
url,
region,
headers,
query_params,
bucket_name,
object_name,
body.as_deref(),
extensions,
)
.await
.inspect_err(|e| log::warn!("Hook {} failed {e}", hook.name()))?;
}
Ok(())
}
/// create an example client for testing on localhost /// create an example client for testing on localhost
#[cfg(feature = "localhost")] #[cfg(feature = "localhost")]
pub fn create_client_on_localhost() pub fn create_client_on_localhost()
@ -632,6 +723,7 @@ impl MinioClient {
pub(crate) struct SharedClientItems { pub(crate) struct SharedClientItems {
pub(crate) base_url: BaseUrl, pub(crate) base_url: BaseUrl,
pub(crate) provider: Option<Arc<dyn Provider + Send + Sync + 'static>>, pub(crate) provider: Option<Arc<dyn Provider + Send + Sync + 'static>>,
client_hooks: Vec<Arc<dyn RequestHooks + Send + Sync + 'static>>,
region_map: DashMap<String, String>, region_map: DashMap<String, String>,
express: OnceLock<bool>, express: OnceLock<bool>,
} }

536
src/s3/client/hooks.rs Normal file
View File

@ -0,0 +1,536 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2024 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Request hooks for intercepting and modifying S3 API requests.
//!
//! Hooks allow you to intercept requests at two key points:
//! - [`RequestHooks::before_signing_mut`] - Modify URL, headers, and parameters before signing
//! - [`RequestHooks::after_execute`] - Inspect responses and track metrics
//!
//! Common use cases: load balancing, telemetry, debug logging, request routing.
//!
//! When a hook modifies the URL, the client adds `x-minio-redirect-from` and `x-minio-redirect-to` headers for server-side tracking.
pub use http::Extensions;
use crate::s3::error::Error;
use crate::s3::http::Url;
use crate::s3::multimap_ext::Multimap;
use crate::s3::segmented_bytes::SegmentedBytes;
use http::Method;
use reqwest::Response;
use std::fmt::Debug;
/// Trait for intercepting and modifying S3 API requests.
///
/// Hooks are called in order and can abort requests by returning errors from `before_signing_mut`.
///
/// # Examples
///
/// ## Load Balancing
/// Redirect requests across multiple MinIO nodes:
/// ```no_run
/// use minio::s3::client::RequestHooks;
/// use minio::s3::http::Url;
/// use minio::s3::error::Error;
/// use minio::s3::multimap_ext::Multimap;
/// use minio::s3::segmented_bytes::SegmentedBytes;
/// use http::{Method, Extensions};
/// use std::sync::atomic::{AtomicUsize, Ordering};
/// use std::sync::Arc;
///
/// #[derive(Debug, Clone)]
/// struct LoadBalancerHook {
/// nodes: Vec<String>,
/// counter: Arc<AtomicUsize>,
/// }
///
/// impl LoadBalancerHook {
/// fn new(nodes: Vec<String>) -> Self {
/// Self {
/// nodes,
/// counter: Arc::new(AtomicUsize::new(0)),
/// }
/// }
///
/// fn select_node(&self) -> &str {
/// // Round-robin load balancing
/// let index = self.counter.fetch_add(1, Ordering::SeqCst) % self.nodes.len();
/// &self.nodes[index]
/// }
/// }
///
/// #[async_trait::async_trait]
/// impl RequestHooks for LoadBalancerHook {
/// fn name(&self) -> &'static str {
/// "load-balancer"
/// }
///
/// async fn before_signing_mut(
/// &self,
/// _method: &Method,
/// url: &mut Url,
/// _region: &str,
/// _headers: &mut Multimap,
/// _query_params: &Multimap,
/// _bucket_name: Option<&str>,
/// _object_name: Option<&str>,
/// _body: Option<&SegmentedBytes>,
/// _extensions: &mut Extensions,
/// ) -> Result<(), Error> {
/// // Select a node based on load balancing strategy
/// url.host = self.select_node().to_string();
/// // Note: The client will automatically add x-minio-redirect-from
/// // and x-minio-redirect-to headers when URL is modified
/// Ok(())
/// }
/// }
///
/// # fn main() {}
/// ```
///
/// ## Telemetry & Debug Logging
/// Track timing and log request details:
/// ```no_run
/// use minio::s3::client::RequestHooks;
/// use minio::s3::error::Error;
/// use minio::s3::http::Url;
/// use minio::s3::multimap_ext::Multimap;
/// use minio::s3::segmented_bytes::SegmentedBytes;
/// use http::{Method, Extensions};
/// use reqwest::Response;
/// use std::time::Instant;
///
/// #[derive(Debug)]
/// struct LoggingHook;
///
/// #[async_trait::async_trait]
/// impl RequestHooks for LoggingHook {
/// fn name(&self) -> &'static str { "logger" }
///
/// async fn before_signing_mut(
/// &self,
/// method: &Method,
/// url: &mut Url,
/// _region: &str,
/// _headers: &mut Multimap,
/// _query_params: &Multimap,
/// _bucket_name: Option<&str>,
/// _object_name: Option<&str>,
/// _body: Option<&SegmentedBytes>,
/// extensions: &mut Extensions,
/// ) -> Result<(), Error> {
/// println!("[REQ] {} {}", method, url);
/// extensions.insert(Instant::now());
/// Ok(())
/// }
///
/// async fn after_execute(
/// &self,
/// _method: &Method,
/// _url: &Url,
/// _region: &str,
/// _headers: &Multimap,
/// _query_params: &Multimap,
/// _bucket_name: Option<&str>,
/// _object_name: Option<&str>,
/// resp: &Result<Response, reqwest::Error>,
/// extensions: &mut Extensions,
/// ) {
/// let duration = extensions.get::<Instant>()
/// .map(|start| start.elapsed())
/// .unwrap_or_default();
///
/// match resp {
/// Ok(r) => println!("[RESP] {} in {:?}", r.status(), duration),
/// Err(e) => println!("[ERR] {} in {:?}", e, duration),
/// }
/// }
/// }
///
/// # fn main() {}
/// ```
#[async_trait::async_trait]
pub trait RequestHooks: Debug {
/// Hook name for logging.
fn name(&self) -> &'static str;
/// Called before signing. Modify URL/headers here. Return error to abort.
async fn before_signing_mut(
&self,
_method: &Method,
_url: &mut Url,
_region: &str,
_headers: &mut Multimap,
_query_params: &Multimap,
_bucket_name: Option<&str>,
_object_name: Option<&str>,
_body: Option<&SegmentedBytes>,
_extensions: &mut Extensions,
) -> Result<(), Error> {
Ok(())
}
/// Called after execution. For logging/telemetry. Errors don't fail the request.
async fn after_execute(
&self,
_method: &Method,
_url: &Url,
_region: &str,
_headers: &Multimap,
_query_params: &Multimap,
_bucket_name: Option<&str>,
_object_name: Option<&str>,
_resp: &Result<Response, reqwest::Error>,
_extensions: &mut Extensions,
) {
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::s3::multimap_ext::MultimapExt;
#[test]
fn test_hook_trait_has_default_implementations() {
#[derive(Debug)]
struct MinimalHook;
#[async_trait::async_trait]
impl RequestHooks for MinimalHook {
fn name(&self) -> &'static str {
"minimal-hook"
}
}
let hook = MinimalHook;
assert_eq!(hook.name(), "minimal-hook");
}
#[tokio::test]
async fn test_hook_can_modify_url() {
#[derive(Debug)]
struct UrlModifyingHook;
#[async_trait::async_trait]
impl RequestHooks for UrlModifyingHook {
fn name(&self) -> &'static str {
"url-modifier"
}
async fn before_signing_mut(
&self,
_method: &Method,
url: &mut Url,
_region: &str,
_headers: &mut Multimap,
_query_params: &Multimap,
_bucket_name: Option<&str>,
_object_name: Option<&str>,
_body: Option<&SegmentedBytes>,
_extensions: &mut Extensions,
) -> Result<(), Error> {
url.host = "modified-host.example.com".to_string();
url.port = 9000;
Ok(())
}
}
let hook = UrlModifyingHook;
let mut url = Url {
https: true,
host: "original-host.example.com".to_string(),
port: 443,
path: "/bucket/object".to_string(),
query: Multimap::new(),
};
let mut headers = Multimap::new();
let query_params = Multimap::new();
let mut extensions = Extensions::default();
let result = hook
.before_signing_mut(
&Method::GET,
&mut url,
"us-east-1",
&mut headers,
&query_params,
Some("bucket"),
Some("object"),
None,
&mut extensions,
)
.await;
assert!(result.is_ok());
assert_eq!(url.host, "modified-host.example.com");
assert_eq!(url.port, 9000);
}
#[tokio::test]
async fn test_hook_can_modify_headers() {
#[derive(Debug)]
struct HeaderModifyingHook;
#[async_trait::async_trait]
impl RequestHooks for HeaderModifyingHook {
fn name(&self) -> &'static str {
"header-modifier"
}
async fn before_signing_mut(
&self,
_method: &Method,
_url: &mut Url,
_region: &str,
headers: &mut Multimap,
_query_params: &Multimap,
_bucket_name: Option<&str>,
_object_name: Option<&str>,
_body: Option<&SegmentedBytes>,
_extensions: &mut Extensions,
) -> Result<(), Error> {
headers.add("X-Custom-Header", "custom-value");
Ok(())
}
}
let hook = HeaderModifyingHook;
let mut url = Url::default();
let mut headers = Multimap::new();
let query_params = Multimap::new();
let mut extensions = Extensions::default();
let result = hook
.before_signing_mut(
&Method::GET,
&mut url,
"us-east-1",
&mut headers,
&query_params,
None,
None,
None,
&mut extensions,
)
.await;
assert!(result.is_ok());
assert!(headers.contains_key("X-Custom-Header"));
assert_eq!(
headers.get("X-Custom-Header"),
Some(&"custom-value".to_string())
);
}
#[tokio::test]
async fn test_hook_can_use_extensions() {
#[derive(Debug)]
struct ExtensionWritingHook;
#[async_trait::async_trait]
impl RequestHooks for ExtensionWritingHook {
fn name(&self) -> &'static str {
"extension-writer"
}
async fn before_signing_mut(
&self,
_method: &Method,
_url: &mut Url,
_region: &str,
_headers: &mut Multimap,
_query_params: &Multimap,
_bucket_name: Option<&str>,
_object_name: Option<&str>,
_body: Option<&SegmentedBytes>,
extensions: &mut Extensions,
) -> Result<(), Error> {
extensions.insert("test-data".to_string());
extensions.insert(42u32);
Ok(())
}
}
let hook = ExtensionWritingHook;
let mut url = Url::default();
let mut headers = Multimap::new();
let query_params = Multimap::new();
let mut extensions = Extensions::default();
let result = hook
.before_signing_mut(
&Method::GET,
&mut url,
"us-east-1",
&mut headers,
&query_params,
None,
None,
None,
&mut extensions,
)
.await;
assert!(result.is_ok());
assert_eq!(extensions.get::<String>(), Some(&"test-data".to_string()));
assert_eq!(extensions.get::<u32>(), Some(&42u32));
}
#[tokio::test]
async fn test_hook_can_return_error() {
use crate::s3::error::ValidationErr;
#[derive(Debug)]
struct ErrorReturningHook;
#[async_trait::async_trait]
impl RequestHooks for ErrorReturningHook {
fn name(&self) -> &'static str {
"error-hook"
}
async fn before_signing_mut(
&self,
_method: &Method,
_url: &mut Url,
_region: &str,
_headers: &mut Multimap,
_query_params: &Multimap,
bucket_name: Option<&str>,
_object_name: Option<&str>,
_body: Option<&SegmentedBytes>,
_extensions: &mut Extensions,
) -> Result<(), Error> {
if bucket_name == Some("forbidden-bucket") {
return Err(Error::Validation(ValidationErr::InvalidBucketName {
name: "forbidden-bucket".to_string(),
reason: "Bucket access denied by hook".to_string(),
}));
}
Ok(())
}
}
let hook = ErrorReturningHook;
let mut url = Url::default();
let mut headers = Multimap::new();
let query_params = Multimap::new();
let mut extensions = Extensions::default();
let result = hook
.before_signing_mut(
&Method::GET,
&mut url,
"us-east-1",
&mut headers,
&query_params,
Some("forbidden-bucket"),
None,
None,
&mut extensions,
)
.await;
assert!(result.is_err());
match result {
Err(Error::Validation(ValidationErr::InvalidBucketName { name, reason })) => {
assert_eq!(name, "forbidden-bucket");
assert!(reason.contains("denied by hook"));
}
_ => panic!("Expected InvalidBucketName error"),
}
}
#[tokio::test]
async fn test_hook_default_after_execute() {
#[derive(Debug)]
struct NoOpHook;
#[async_trait::async_trait]
impl RequestHooks for NoOpHook {
fn name(&self) -> &'static str {
"noop-hook"
}
}
let hook = NoOpHook;
let mut url = Url::default();
let mut headers = Multimap::new();
let query_params = Multimap::new();
let mut extensions = Extensions::default();
let result = hook
.before_signing_mut(
&Method::GET,
&mut url,
"us-east-1",
&mut headers,
&query_params,
None,
None,
None,
&mut extensions,
)
.await;
assert!(result.is_ok());
}
#[test]
fn test_debug_logging_format() {
// Test the debug logging formatting logic without async complexity
let method = Method::PUT;
let url = Url {
https: true,
host: "minio.example.com".to_string(),
port: 443,
path: "/bucket/object".to_string(),
query: Multimap::new(),
};
let mut headers = Multimap::new();
headers.add("Content-Type", "application/json");
headers.add("Authorization", "AWS4-HMAC-SHA256 Credential=...");
headers.add("x-amz-date", "20240101T000000Z");
let mut header_strings: Vec<String> = headers
.iter_all()
.map(|(k, v)| format!("{}: {}", k, v.join(",")))
.collect();
header_strings.sort();
let debug_str = format!(
"S3 request: {} url={}; headers={}",
method,
url,
header_strings.join("; ")
);
let truncated = if debug_str.len() > 1000 {
format!("{}...", &debug_str[..997])
} else {
debug_str.clone()
};
assert!(debug_str.contains("S3 request:"));
assert!(debug_str.contains("PUT"));
assert!(debug_str.contains("minio.example.com"));
assert!(debug_str.contains("Content-Type"));
assert!(debug_str.contains("Authorization"));
assert!(debug_str.contains("x-amz-date"));
assert_eq!(truncated, debug_str);
}
}

View File

@ -236,6 +236,12 @@ pub enum ValidationErr {
#[error("Content length is unknown")] #[error("Content length is unknown")]
ContentLengthUnknown, ContentLengthUnknown,
#[error("{name} interceptor failed: {source}")]
Hook {
source: Box<dyn std::error::Error + Send + Sync>,
name: String,
},
} }
impl From<reqwest::header::ToStrError> for ValidationErr { impl From<reqwest::header::ToStrError> for ValidationErr {

View File

@ -253,7 +253,7 @@ impl ObjectContent {
} }
} }
pub(crate) struct ContentStream { pub struct ContentStream {
r: Pin<Box<dyn Stream<Item = IoResult<Bytes>> + Send>>, r: Pin<Box<dyn Stream<Item = IoResult<Bytes>> + Send>>,
extra: Option<Bytes>, extra: Option<Bytes>,
size: Size, size: Size,