added CRC32, CRC32C, SHA1, SHA256 and CRC64NVME (#195)

This commit is contained in:
Henk-Jan Lebbink 2026-01-06 18:43:25 +01:00 committed by GitHub
parent 1b7ae9e473
commit 7398f3a14e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 5015 additions and 82 deletions

View File

@ -24,7 +24,7 @@ http2 = ["reqwest/http2"]
localhost = [] localhost = []
[workspace.dependencies] [workspace.dependencies]
uuid = "1.18" uuid = "1.19"
futures-util = "0.3" futures-util = "0.3"
futures-io = "0.3" futures-io = "0.3"
reqwest = { version = "0.12", default-features = false } reqwest = { version = "0.12", default-features = false }
@ -49,7 +49,7 @@ async-stream = "0.3"
async-trait = "0.1" async-trait = "0.1"
base64 = "0.22" base64 = "0.22"
chrono = { workspace = true, features = ["serde"] } chrono = { workspace = true, features = ["serde"] }
crc = "3.4" crc-fast = "1.8"
dashmap = "6.1.0" dashmap = "6.1.0"
env_logger = "0.11" env_logger = "0.11"
hmac = { version = "0.12", optional = true } hmac = { version = "0.12", optional = true }
@ -64,6 +64,7 @@ regex = "1.12"
ring = { version = "0.17", optional = true, default-features = false, features = ["alloc"] } ring = { version = "0.17", optional = true, default-features = false, features = ["alloc"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sha1 = "0.10"
sha2 = { version = "0.10", optional = true } sha2 = { version = "0.10", optional = true }
urlencoding = "2.1" urlencoding = "2.1"
xmltree = "0.12" xmltree = "0.12"
@ -104,3 +105,8 @@ name = "load_balancing_with_hooks"
name = "s3-api" name = "s3-api"
path = "benches/s3/api_benchmarks.rs" path = "benches/s3/api_benchmarks.rs"
harness = false harness = false
[[bench]]
name = "bench_checksums"
path = "benches/s3/bench_checksums.rs"
harness = false

View File

@ -0,0 +1,53 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2025 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use criterion::{Criterion, Throughput, criterion_group, criterion_main};
use minio::s3::utils::{
crc32_checksum, crc32c, crc64nvme_checksum, md5sum_hash, sha1_hash, sha256_checksum,
};
fn bench_checksums(c: &mut Criterion) {
let sizes = vec![
("1KB", 1024),
("10KB", 10 * 1024),
("100KB", 100 * 1024),
("1MB", 1024 * 1024),
("10MB", 10 * 1024 * 1024),
];
for (name, size) in sizes {
let data = vec![0u8; size];
let mut group = c.benchmark_group(format!("checksum_{}", name));
group.throughput(Throughput::Bytes(size as u64));
group.bench_function("CRC32", |b| b.iter(|| crc32_checksum(&data)));
group.bench_function("CRC32C", |b| b.iter(|| crc32c(&data)));
group.bench_function("CRC64NVME", |b| b.iter(|| crc64nvme_checksum(&data)));
group.bench_function("MD5", |b| b.iter(|| md5sum_hash(&data)));
group.bench_function("SHA1", |b| b.iter(|| sha1_hash(&data)));
group.bench_function("SHA256", |b| b.iter(|| sha256_checksum(&data)));
group.finish();
}
}
criterion_group!(benches, bench_checksums);
criterion_main!(benches);

View File

@ -74,6 +74,14 @@ extern crate proc_macro;
/// // this test will not run if the MinIO server is NOT running in Express mode /// // this test will not run if the MinIO server is NOT running in Express mode
/// } /// }
/// ``` /// ```
/// - `ignore`: Mark the test as ignored (skipped by default). Run with `cargo test -- --ignored` to include.
/// ```no_run
/// use minio_common::test_context::TestContext;
/// #[minio_macros::test(ignore = "Requires newer server version")]
/// async fn my_test(ctx: TestContext, bucket_name: String) {
/// // this test is skipped by default
/// }
/// ```
#[proc_macro_attribute] #[proc_macro_attribute]
pub fn test( pub fn test(
args: proc_macro::TokenStream, args: proc_macro::TokenStream,

View File

@ -32,6 +32,8 @@ pub(crate) struct MacroArgs {
no_bucket: darling::util::Flag, no_bucket: darling::util::Flag,
object_lock: darling::util::Flag, object_lock: darling::util::Flag,
no_cleanup: darling::util::Flag, no_cleanup: darling::util::Flag,
/// Mark test as ignored (skipped by default, run with `cargo test -- --ignored`)
ignore: Option<String>,
} }
impl MacroArgs { impl MacroArgs {
@ -171,28 +173,33 @@ fn generate_tokio_test_header(args: &MacroArgs, sig: TokenStream) -> TokenStream
.as_ref() .as_ref()
.map(ToString::to_string) .map(ToString::to_string)
.or(std::env::var("MINIO_TEST_TOKIO_RUNTIME_FLAVOR").ok()); .or(std::env::var("MINIO_TEST_TOKIO_RUNTIME_FLAVOR").ok());
match (flavor, args.worker_threads) {
// Generate #[ignore = "reason"] if specified
let ignore_attr = args
.ignore
.as_ref()
.map(|reason| quote!(#[ignore = #reason]));
let tokio_attr = match (flavor, args.worker_threads) {
(Some(flavor), None) => { (Some(flavor), None) => {
quote!(#[::tokio::test(flavor = #flavor)] quote!(#[::tokio::test(flavor = #flavor)])
#sig
)
} }
(None, Some(worker_threads)) => { (None, Some(worker_threads)) => {
quote!(#[::tokio::test(worker_threads = #worker_threads)] quote!(#[::tokio::test(worker_threads = #worker_threads)])
#sig
)
} }
(None, None) => { (None, None) => {
quote!(#[::tokio::test] quote!(#[::tokio::test])
#sig
)
} }
(Some(flavor), Some(worker_threads)) => { (Some(flavor), Some(worker_threads)) => {
quote!(#[::tokio::test(flavor = #flavor, worker_threads = #worker_threads)] quote!(#[::tokio::test(flavor = #flavor, worker_threads = #worker_threads)])
}
};
quote!(
#ignore_attr
#tokio_attr
#sig #sig
) )
}
}
} }
fn generate_express_skip_logic(args: &MacroArgs, span: proc_macro2::Span) -> TokenStream { fn generate_express_skip_logic(args: &MacroArgs, span: proc_macro2::Span) -> TokenStream {

999
src/s3/aws_chunked.rs Normal file
View File

@ -0,0 +1,999 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2025 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! AWS Chunked encoding with trailing checksum support.
//!
//! This module implements the `aws-chunked` content encoding format used by S3
//! for streaming uploads with trailing checksums. The format allows computing
//! checksums incrementally while streaming data, with the checksum value sent
//! as a trailer at the end of the body.
//!
//! # Unsigned Protocol Format (STREAMING-UNSIGNED-PAYLOAD-TRAILER)
//!
//! ```text
//! <hex-chunk-size>\r\n
//! <chunk-data>\r\n
//! ...
//! 0\r\n
//! x-amz-checksum-<algorithm>:<base64-value>\r\n
//! \r\n
//! ```
//!
//! # Signed Protocol Format (STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER)
//!
//! ```text
//! <hex-chunk-size>;chunk-signature=<sig>\r\n
//! <chunk-data>\r\n
//! ...
//! 0;chunk-signature=<final-sig>\r\n
//! x-amz-checksum-<algorithm>:<base64-value>\r\n
//! x-amz-trailer-signature:<trailer-sig>\r\n
//! \r\n
//! ```
//!
//! # Wire Format vs Canonical Form
//!
//! **Important**: There are two different line ending conventions:
//!
//! - **Wire format (HTTP protocol)**: Uses `\r\n` (CRLF) per RFC 9112 (HTTP/1.1)
//! - **Canonical form (for signing)**: Uses `\n` (LF) per AWS SigV4 spec
//!
//! When computing the trailer signature, AWS specifies:
//! ```text
//! hash('x-amz-checksum-crc32c:sOO8/Q==\n') // Note: \n not \r\n
//! ```
//!
//! But the actual bytes sent over HTTP use CRLF line endings.
//!
//! Reference: <https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming-trailers.html>
use crate::s3::signer::{ChunkSigningContext, sign_chunk, sign_trailer};
use crate::s3::utils::{ChecksumAlgorithm, b64_encode, sha256_hash};
use bytes::Bytes;
use crc_fast::{CrcAlgorithm, Digest as CrcFastDigest};
use futures_util::Stream;
#[cfg(feature = "ring")]
use ring::digest::{Context, SHA256};
use sha1::{Digest as Sha1Digest, Sha1};
#[cfg(not(feature = "ring"))]
use sha2::Sha256;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context as TaskContext, Poll};
/// Default chunk size for aws-chunked encoding (64 KB).
const DEFAULT_CHUNK_SIZE: usize = 64 * 1024;
/// Incremental checksum hasher for streaming computation.
enum StreamingHasher {
Crc32(CrcFastDigest),
Crc32c(CrcFastDigest),
Crc64nvme(CrcFastDigest),
Sha1(Sha1),
#[cfg(feature = "ring")]
Sha256(Context),
#[cfg(not(feature = "ring"))]
Sha256(Sha256),
}
impl StreamingHasher {
fn new(algorithm: ChecksumAlgorithm) -> Self {
match algorithm {
ChecksumAlgorithm::CRC32 => {
StreamingHasher::Crc32(CrcFastDigest::new(CrcAlgorithm::Crc32IsoHdlc))
}
ChecksumAlgorithm::CRC32C => {
StreamingHasher::Crc32c(CrcFastDigest::new(CrcAlgorithm::Crc32Iscsi))
}
ChecksumAlgorithm::CRC64NVME => {
StreamingHasher::Crc64nvme(CrcFastDigest::new(CrcAlgorithm::Crc64Nvme))
}
ChecksumAlgorithm::SHA1 => StreamingHasher::Sha1(Sha1::new()),
#[cfg(feature = "ring")]
ChecksumAlgorithm::SHA256 => StreamingHasher::Sha256(Context::new(&SHA256)),
#[cfg(not(feature = "ring"))]
ChecksumAlgorithm::SHA256 => StreamingHasher::Sha256(Sha256::new()),
}
}
fn update(&mut self, data: &[u8]) {
match self {
StreamingHasher::Crc32(d) => d.update(data),
StreamingHasher::Crc32c(d) => d.update(data),
StreamingHasher::Crc64nvme(d) => d.update(data),
StreamingHasher::Sha1(h) => h.update(data),
#[cfg(feature = "ring")]
StreamingHasher::Sha256(ctx) => ctx.update(data),
#[cfg(not(feature = "ring"))]
StreamingHasher::Sha256(h) => h.update(data),
}
}
fn finalize(self) -> String {
match self {
// crc-fast returns u64; CRC32 variants need cast to u32
StreamingHasher::Crc32(d) => b64_encode((d.finalize() as u32).to_be_bytes()),
StreamingHasher::Crc32c(d) => b64_encode((d.finalize() as u32).to_be_bytes()),
StreamingHasher::Crc64nvme(d) => b64_encode(d.finalize().to_be_bytes()),
StreamingHasher::Sha1(h) => {
let result = h.finalize();
b64_encode(&result[..])
}
#[cfg(feature = "ring")]
StreamingHasher::Sha256(ctx) => b64_encode(ctx.finish().as_ref()),
#[cfg(not(feature = "ring"))]
StreamingHasher::Sha256(h) => {
let result = h.finalize();
b64_encode(&result[..])
}
}
}
}
/// State machine for the aws-chunked encoder.
#[derive(Clone, Copy)]
enum EncoderState {
/// Emitting data chunks
Streaming,
/// Emitting the final zero-length chunk marker
FinalChunk,
/// Emitting the trailer with checksum
Trailer,
/// Done
Done,
}
/// AWS Chunked encoder that wraps data in aws-chunked format with trailing checksum.
///
/// This encoder takes input data and produces output in the following format:
/// ```text
/// <hex-size>\r\n
/// <data>\r\n
/// 0\r\n
/// x-amz-checksum-<alg>:<base64>\r\n
/// \r\n
/// ```
pub struct AwsChunkedEncoder<S> {
inner: S,
algorithm: ChecksumAlgorithm,
hasher: Option<StreamingHasher>,
state: EncoderState,
}
impl<S> AwsChunkedEncoder<S> {
/// Creates a new AWS chunked encoder wrapping the given stream.
pub fn new(inner: S, algorithm: ChecksumAlgorithm) -> Self {
Self {
inner,
algorithm,
hasher: Some(StreamingHasher::new(algorithm)),
state: EncoderState::Streaming,
}
}
}
impl<S, E> Stream for AwsChunkedEncoder<S>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
{
type Item = Result<Bytes, E>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
loop {
match self.state {
EncoderState::Streaming => {
let inner = Pin::new(&mut self.inner);
match inner.poll_next(cx) {
Poll::Ready(Some(Ok(chunk))) => {
if chunk.is_empty() {
continue;
}
// Update checksum with raw data
if let Some(ref mut hasher) = self.hasher {
hasher.update(&chunk);
}
// Format: <hex-size>\r\n<data>\r\n
let chunk_header = format!("{:x}\r\n", chunk.len());
let mut output =
Vec::with_capacity(chunk_header.len() + chunk.len() + 2);
output.extend_from_slice(chunk_header.as_bytes());
output.extend_from_slice(&chunk);
output.extend_from_slice(b"\r\n");
return Poll::Ready(Some(Ok(Bytes::from(output))));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
// Input stream exhausted, move to final chunk
self.state = EncoderState::FinalChunk;
}
Poll::Pending => {
return Poll::Pending;
}
}
}
EncoderState::FinalChunk => {
// Emit "0\r\n" for the final zero-length chunk
self.state = EncoderState::Trailer;
return Poll::Ready(Some(Ok(Bytes::from_static(b"0\r\n"))));
}
EncoderState::Trailer => {
// Compute and emit the trailer
let hasher = self.hasher.take().expect("hasher should exist");
let checksum_value = hasher.finalize();
let trailer = format!(
"{}:{}\r\n\r\n",
self.algorithm.header_name(),
checksum_value
);
self.state = EncoderState::Done;
return Poll::Ready(Some(Ok(Bytes::from(trailer))));
}
EncoderState::Done => {
return Poll::Ready(None);
}
}
}
}
}
/// Calculates the encoded length for aws-chunked format.
///
/// For a given content length and chunk size, returns the total encoded length
/// including all chunk headers, the final zero-length chunk, and the trailer.
pub fn calculate_encoded_length(
content_length: u64,
chunk_size: usize,
algorithm: ChecksumAlgorithm,
) -> u64 {
let chunk_size = chunk_size as u64;
// Number of full chunks
let full_chunks = content_length / chunk_size;
// Size of the last partial chunk (0 if content divides evenly)
let last_chunk_size = content_length % chunk_size;
let has_partial = if last_chunk_size > 0 { 1 } else { 0 };
// Each chunk: "<hex-size>\r\n<data>\r\n"
// hex-size length varies based on chunk size
let hex_len_full = format!("{:x}", chunk_size).len() as u64;
let hex_len_partial = if last_chunk_size > 0 {
format!("{:x}", last_chunk_size).len() as u64
} else {
0
};
// Full chunks overhead: hex_len + 2 (\r\n) + chunk_size + 2 (\r\n)
let full_chunk_overhead = full_chunks * (hex_len_full + 2 + chunk_size + 2);
// Partial chunk overhead (if any)
let partial_chunk_overhead = if has_partial > 0 {
hex_len_partial + 2 + last_chunk_size + 2
} else {
0
};
// Final chunk: "0\r\n"
let final_chunk = 3;
// Trailer: "x-amz-checksum-<alg>:<base64>\r\n\r\n"
// Header name length + ":" + base64 checksum length + "\r\n\r\n"
let trailer_header_len = algorithm.header_name().len() as u64;
let checksum_b64_len = match algorithm {
ChecksumAlgorithm::CRC32 | ChecksumAlgorithm::CRC32C => 8, // 4 bytes -> 8 chars base64
ChecksumAlgorithm::CRC64NVME => 12, // 8 bytes -> 12 chars base64
ChecksumAlgorithm::SHA1 => 28, // 20 bytes -> 28 chars base64
ChecksumAlgorithm::SHA256 => 44, // 32 bytes -> 44 chars base64
};
let trailer_len = trailer_header_len + 1 + checksum_b64_len + 4; // +1 for ":", +4 for "\r\n\r\n"
full_chunk_overhead + partial_chunk_overhead + final_chunk + trailer_len
}
/// Returns the default chunk size for aws-chunked encoding.
pub fn default_chunk_size() -> usize {
DEFAULT_CHUNK_SIZE
}
// ===========================
// Signed AWS Chunked Encoder
// ===========================
/// State machine for the signed aws-chunked encoder.
#[derive(Clone, Copy)]
enum SignedEncoderState {
/// Emitting signed data chunks
Streaming,
/// Emitting the final zero-length chunk with signature
FinalChunk,
/// Emitting the checksum trailer header
Trailer,
/// Emitting the trailer signature
TrailerSignature,
/// Done
Done,
}
/// AWS Chunked encoder with chunk signing for STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER.
///
/// Each chunk is signed using the AWS Signature V4 chunk signing algorithm.
/// The final trailer is also signed with a trailer signature.
///
/// # Wire Format
///
/// ```text
/// <hex-size>;chunk-signature=<sig>\r\n
/// <data>\r\n
/// 0;chunk-signature=<final-sig>\r\n
/// x-amz-checksum-<alg>:<base64>\r\n
/// x-amz-trailer-signature:<trailer-sig>\r\n
/// \r\n
/// ```
pub struct SignedAwsChunkedEncoder<S> {
inner: S,
algorithm: ChecksumAlgorithm,
hasher: Option<StreamingHasher>,
state: SignedEncoderState,
// Signing context
signing_key: Arc<[u8]>,
date_time: String,
scope: String,
// Signature chain - each chunk's signature becomes the previous for the next
current_signature: String,
// Store the checksum value for trailer signature computation
checksum_value: Option<String>,
}
impl<S> SignedAwsChunkedEncoder<S> {
/// Creates a new signed AWS chunked encoder wrapping the given stream.
///
/// # Arguments
/// * `inner` - The underlying data stream
/// * `algorithm` - The checksum algorithm to use
/// * `context` - The chunk signing context from request signing
pub fn new(inner: S, algorithm: ChecksumAlgorithm, context: ChunkSigningContext) -> Self {
Self {
inner,
algorithm,
hasher: Some(StreamingHasher::new(algorithm)),
state: SignedEncoderState::Streaming,
signing_key: context.signing_key,
date_time: context.date_time,
scope: context.scope,
current_signature: context.seed_signature,
checksum_value: None,
}
}
/// Signs a chunk and returns the signature.
fn sign_chunk_data(&mut self, chunk_hash: &str) -> String {
let signature = sign_chunk(
&self.signing_key,
&self.date_time,
&self.scope,
&self.current_signature,
chunk_hash,
);
self.current_signature = signature.clone();
signature
}
}
impl<S, E> Stream for SignedAwsChunkedEncoder<S>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
{
type Item = Result<Bytes, E>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
loop {
match self.state {
SignedEncoderState::Streaming => {
let inner = Pin::new(&mut self.inner);
match inner.poll_next(cx) {
Poll::Ready(Some(Ok(chunk))) => {
if chunk.is_empty() {
continue;
}
// Update checksum hasher with raw data
if let Some(ref mut hasher) = self.hasher {
hasher.update(&chunk);
}
// Compute SHA256 hash of chunk data for signing
let chunk_hash = sha256_hash(&chunk);
// Sign the chunk
let signature = self.sign_chunk_data(&chunk_hash);
// Format: <hex-size>;chunk-signature=<sig>\r\n<data>\r\n
let chunk_header =
format!("{:x};chunk-signature={}\r\n", chunk.len(), signature);
let mut output =
Vec::with_capacity(chunk_header.len() + chunk.len() + 2);
output.extend_from_slice(chunk_header.as_bytes());
output.extend_from_slice(&chunk);
output.extend_from_slice(b"\r\n");
return Poll::Ready(Some(Ok(Bytes::from(output))));
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
// Input stream exhausted, move to final chunk
self.state = SignedEncoderState::FinalChunk;
}
Poll::Pending => {
return Poll::Pending;
}
}
}
SignedEncoderState::FinalChunk => {
// Sign the empty chunk (SHA256 of empty string)
const EMPTY_SHA256: &str =
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
let signature = self.sign_chunk_data(EMPTY_SHA256);
// Emit "0;chunk-signature=<sig>\r\n"
let final_chunk = format!("0;chunk-signature={}\r\n", signature);
self.state = SignedEncoderState::Trailer;
return Poll::Ready(Some(Ok(Bytes::from(final_chunk))));
}
SignedEncoderState::Trailer => {
// Compute and store the checksum value
let hasher = self.hasher.take().expect("hasher should exist");
let checksum_value = hasher.finalize();
self.checksum_value = Some(checksum_value.clone());
// Emit the checksum trailer using CRLF (wire format per RFC 9112)
// Note: The canonical form for signing uses LF (\n), but HTTP wire
// format uses CRLF (\r\n). See module docs for details.
let trailer = format!(
"{}:{}\r\n",
self.algorithm.header_name().to_lowercase(),
checksum_value
);
self.state = SignedEncoderState::TrailerSignature;
return Poll::Ready(Some(Ok(Bytes::from(trailer))));
}
SignedEncoderState::TrailerSignature => {
// Compute the canonical trailers string for signing.
// IMPORTANT: AWS SigV4 canonical form uses LF (\n), NOT CRLF (\r\n).
// Per AWS docs: hash('x-amz-checksum-crc32c:sOO8/Q==\n')
// This differs from the wire format which uses CRLF.
let checksum_value =
self.checksum_value.as_ref().expect("checksum should exist");
let canonical_trailers = format!(
"{}:{}\n", // LF for canonical form (signing)
self.algorithm.header_name().to_lowercase(),
checksum_value
);
// Hash the canonical trailers
let trailers_hash = sha256_hash(canonical_trailers.as_bytes());
// Sign the trailer
let trailer_signature = sign_trailer(
&self.signing_key,
&self.date_time,
&self.scope,
&self.current_signature,
&trailers_hash,
);
// Emit trailer signature using CRLF (wire format per RFC 9112)
// Final \r\n\r\n marks end of trailer section
let trailer_sig_line =
format!("x-amz-trailer-signature:{}\r\n\r\n", trailer_signature);
self.state = SignedEncoderState::Done;
return Poll::Ready(Some(Ok(Bytes::from(trailer_sig_line))));
}
SignedEncoderState::Done => {
return Poll::Ready(None);
}
}
}
}
}
/// Calculates the encoded length for signed aws-chunked format.
///
/// For a given content length and chunk size, returns the total encoded length
/// including all chunk headers with signatures, the final zero-length chunk,
/// the checksum trailer, and the trailer signature.
pub fn calculate_signed_encoded_length(
content_length: u64,
chunk_size: usize,
algorithm: ChecksumAlgorithm,
) -> u64 {
let chunk_size = chunk_size as u64;
// Number of full chunks
let full_chunks = content_length / chunk_size;
// Size of the last partial chunk (0 if content divides evenly)
let last_chunk_size = content_length % chunk_size;
let has_partial = if last_chunk_size > 0 { 1 } else { 0 };
// Each signed chunk: "<hex-size>;chunk-signature=<64-hex>\r\n<data>\r\n"
// Signature overhead per chunk: ";chunk-signature=" (17) + 64 hex chars = 81 bytes
let signature_overhead: u64 = 81;
let hex_len_full = format!("{:x}", chunk_size).len() as u64;
let hex_len_partial = if last_chunk_size > 0 {
format!("{:x}", last_chunk_size).len() as u64
} else {
0
};
// Full chunks: hex_len + signature_overhead + 2 (\r\n) + chunk_size + 2 (\r\n)
let full_chunk_overhead =
full_chunks * (hex_len_full + signature_overhead + 2 + chunk_size + 2);
// Partial chunk (if any)
let partial_chunk_overhead = if has_partial > 0 {
hex_len_partial + signature_overhead + 2 + last_chunk_size + 2
} else {
0
};
// Final chunk: "0;chunk-signature=<64-hex>\r\n" = 1 + 81 + 2 = 84
let final_chunk = 84;
// Checksum trailer: "<lowercase-header>:<base64>\r\n"
// Header name is lowercase (e.g., "x-amz-checksum-crc32")
let trailer_header_len = algorithm.header_name().to_lowercase().len() as u64;
let checksum_b64_len = match algorithm {
ChecksumAlgorithm::CRC32 | ChecksumAlgorithm::CRC32C => 8,
ChecksumAlgorithm::CRC64NVME => 12,
ChecksumAlgorithm::SHA1 => 28,
ChecksumAlgorithm::SHA256 => 44,
};
let checksum_trailer = trailer_header_len + 1 + checksum_b64_len + 2; // +1 for ":", +2 for "\r\n"
// Trailer signature: "x-amz-trailer-signature:<64-hex>\r\n\r\n"
// = 24 + 64 + 4 = 92 bytes
let trailer_signature = 92;
full_chunk_overhead
+ partial_chunk_overhead
+ final_chunk
+ checksum_trailer
+ trailer_signature
}
// ===========================
// Rechunking Stream Wrapper
// ===========================
/// A stream wrapper that re-chunks incoming data to a fixed chunk size.
///
/// This ensures that the actual chunks produced match the chunk size assumed
/// by `calculate_encoded_length` and `calculate_signed_encoded_length`,
/// preventing Content-Length mismatches when the input stream produces
/// differently-sized chunks.
pub struct RechunkingStream<S> {
inner: S,
chunk_size: usize,
buffer: Vec<u8>,
done: bool,
}
impl<S> RechunkingStream<S> {
/// Creates a new rechunking stream wrapper.
///
/// The wrapper buffers incoming data and emits chunks of exactly `chunk_size` bytes,
/// except for the final chunk which may be smaller.
pub fn new(inner: S, chunk_size: usize) -> Self {
Self {
inner,
chunk_size,
buffer: Vec::with_capacity(chunk_size),
done: false,
}
}
/// Creates a new rechunking stream with the default chunk size (64 KB).
pub fn with_default_chunk_size(inner: S) -> Self {
Self::new(inner, DEFAULT_CHUNK_SIZE)
}
}
impl<S, E> Stream for RechunkingStream<S>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
{
type Item = Result<Bytes, E>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
let chunk_size = self.chunk_size;
if self.done && self.buffer.is_empty() {
return Poll::Ready(None);
}
// If we already have a full chunk buffered, emit it
if self.buffer.len() >= chunk_size {
let chunk: Vec<u8> = self.buffer.drain(..chunk_size).collect();
return Poll::Ready(Some(Ok(Bytes::from(chunk))));
}
// Try to fill the buffer from the inner stream
loop {
if self.done {
// Inner stream exhausted, emit remaining buffer as final chunk
if self.buffer.is_empty() {
return Poll::Ready(None);
}
let remaining = std::mem::take(&mut self.buffer);
return Poll::Ready(Some(Ok(Bytes::from(remaining))));
}
let inner = Pin::new(&mut self.inner);
match inner.poll_next(cx) {
Poll::Ready(Some(Ok(chunk))) => {
if chunk.is_empty() {
continue;
}
self.buffer.extend_from_slice(&chunk);
// If we now have enough for a full chunk, emit it
if self.buffer.len() >= chunk_size {
let chunk: Vec<u8> = self.buffer.drain(..chunk_size).collect();
return Poll::Ready(Some(Ok(Bytes::from(chunk))));
}
// Otherwise continue buffering
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
self.done = true;
// Loop will handle emitting remaining buffer
}
Poll::Pending => {
return Poll::Pending;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::StreamExt;
#[tokio::test]
async fn test_aws_chunked_encoder_simple() {
let data = Bytes::from("Hello, World!");
// Use iter instead of once - iter produces an Unpin stream
let stream = futures_util::stream::iter(vec![Ok::<_, std::io::Error>(data.clone())]);
let mut encoder = AwsChunkedEncoder::new(stream, ChecksumAlgorithm::CRC32);
let mut output = Vec::new();
while let Some(chunk) = encoder.next().await {
output.extend_from_slice(&chunk.unwrap());
}
let output_str = String::from_utf8(output).unwrap();
// Should start with hex size of "Hello, World!" (13 bytes = 'd')
assert!(output_str.starts_with("d\r\n"));
// Should contain the data
assert!(output_str.contains("Hello, World!"));
// Should end with trailer (header name is mixed-case per S3 spec)
assert!(output_str.contains("X-Amz-Checksum-CRC32:"));
assert!(output_str.ends_with("\r\n\r\n"));
// Should have zero-length final chunk
assert!(output_str.contains("\r\n0\r\n"));
}
#[tokio::test]
async fn test_aws_chunked_encoder_multiple_chunks() {
let chunks = vec![
Ok::<_, std::io::Error>(Bytes::from("Hello, ")),
Ok(Bytes::from("World!")),
];
let stream = futures_util::stream::iter(chunks);
let mut encoder = AwsChunkedEncoder::new(stream, ChecksumAlgorithm::CRC64NVME);
let mut output = Vec::new();
while let Some(chunk) = encoder.next().await {
output.extend_from_slice(&chunk.unwrap());
}
let output_str = String::from_utf8(output).unwrap();
// Should have two chunk headers
assert!(output_str.starts_with("7\r\n")); // "Hello, " is 7 bytes
assert!(output_str.contains("6\r\n")); // "World!" is 6 bytes
assert!(output_str.contains("X-Amz-Checksum-CRC64NVME:"));
}
#[test]
fn test_calculate_encoded_length() {
// Simple case: 100 bytes, 64KB chunks
let len = calculate_encoded_length(100, 64 * 1024, ChecksumAlgorithm::CRC32);
// 100 bytes fits in one chunk: "64\r\n" (4) + 100 + "\r\n" (2) + "0\r\n" (3) + trailer
// "64" is hex for 100, which is "64" (2 chars)
// trailer: "x-amz-checksum-crc32:" (21) + 8 (base64) + "\r\n\r\n" (4) = 33
// Total: 2 + 2 + 100 + 2 + 3 + 33 = 142
assert!(len > 100); // Should be larger than raw content
}
// ===========================
// Signed Encoder Tests
// ===========================
fn test_signing_context() -> ChunkSigningContext {
ChunkSigningContext {
signing_key: Arc::from(vec![
// Pre-computed signing key for test credentials
0x98, 0xf1, 0xd8, 0x89, 0xfe, 0xc4, 0xf4, 0x42, 0x1a, 0xdc, 0x52, 0x2b, 0xab, 0x0c,
0xe1, 0xf8, 0x2c, 0x6c, 0x4e, 0x4e, 0xc3, 0x9a, 0xe1, 0xf6, 0xcc, 0xf2, 0x0e, 0x8f,
0x40, 0x89, 0x45, 0x65,
]),
date_time: "20130524T000000Z".to_string(),
scope: "20130524/us-east-1/s3/aws4_request".to_string(),
seed_signature: "4f232c4386841ef735655705268965c44a0e4690baa4adea153f7db9fa80a0a9"
.to_string(),
}
}
#[tokio::test]
async fn test_signed_encoder_simple() {
let data = Bytes::from("Hello, World!");
let stream = futures_util::stream::iter(vec![Ok::<_, std::io::Error>(data)]);
let context = test_signing_context();
let mut encoder = SignedAwsChunkedEncoder::new(stream, ChecksumAlgorithm::CRC32, context);
let mut output = Vec::new();
while let Some(chunk) = encoder.next().await {
output.extend_from_slice(&chunk.unwrap());
}
let output_str = String::from_utf8(output).unwrap();
// Should start with hex size and chunk-signature
assert!(output_str.starts_with("d;chunk-signature="));
// Should contain the data
assert!(output_str.contains("Hello, World!"));
// Should have final chunk with signature
assert!(output_str.contains("0;chunk-signature="));
// Should have checksum trailer (lowercase)
assert!(output_str.contains("x-amz-checksum-crc32:"));
// Should have trailer signature
assert!(output_str.contains("x-amz-trailer-signature:"));
// Should end with \r\n\r\n
assert!(output_str.ends_with("\r\n\r\n"));
}
#[tokio::test]
async fn test_signed_encoder_multiple_chunks() {
let chunks = vec![
Ok::<_, std::io::Error>(Bytes::from("Hello, ")),
Ok(Bytes::from("World!")),
];
let stream = futures_util::stream::iter(chunks);
let context = test_signing_context();
let mut encoder = SignedAwsChunkedEncoder::new(stream, ChecksumAlgorithm::CRC32C, context);
let mut output = Vec::new();
while let Some(chunk) = encoder.next().await {
output.extend_from_slice(&chunk.unwrap());
}
let output_str = String::from_utf8(output).unwrap();
// Should have two chunk signatures (different signatures due to chaining)
let sig_count = output_str.matches(";chunk-signature=").count();
assert_eq!(sig_count, 3); // 2 data chunks + 1 final chunk
// Should have checksum trailer
assert!(output_str.contains("x-amz-checksum-crc32c:"));
// Should have trailer signature
assert!(output_str.contains("x-amz-trailer-signature:"));
}
#[tokio::test]
async fn test_signed_encoder_signature_is_64_hex_chars() {
let data = Bytes::from("test");
let stream = futures_util::stream::iter(vec![Ok::<_, std::io::Error>(data)]);
let context = test_signing_context();
let mut encoder = SignedAwsChunkedEncoder::new(stream, ChecksumAlgorithm::CRC32, context);
let mut output = Vec::new();
while let Some(chunk) = encoder.next().await {
output.extend_from_slice(&chunk.unwrap());
}
let output_str = String::from_utf8(output).unwrap();
// Extract signatures and verify they're 64 hex chars
for sig_match in output_str.match_indices(";chunk-signature=") {
let start = sig_match.0 + sig_match.1.len();
let sig = &output_str[start..start + 64];
assert!(
sig.chars().all(|c| c.is_ascii_hexdigit()),
"Signature should be hex: {}",
sig
);
}
// Also check trailer signature
let trailer_sig_start = output_str.find("x-amz-trailer-signature:").unwrap() + 24;
let trailer_sig = &output_str[trailer_sig_start..trailer_sig_start + 64];
assert!(
trailer_sig.chars().all(|c| c.is_ascii_hexdigit()),
"Trailer signature should be hex: {}",
trailer_sig
);
}
#[test]
fn test_calculate_signed_encoded_length() {
// 100 bytes, 64KB chunks
let len = calculate_signed_encoded_length(100, 64 * 1024, ChecksumAlgorithm::CRC32);
// Should be larger than unsigned (due to signature overhead)
let unsigned_len = calculate_encoded_length(100, 64 * 1024, ChecksumAlgorithm::CRC32);
assert!(
len > unsigned_len,
"Signed length {} should be > unsigned length {}",
len,
unsigned_len
);
}
#[test]
fn test_calculate_signed_encoded_length_multiple_chunks() {
// 200KB with 64KB chunks = 3 full chunks + 1 partial + final
let content_len = 200 * 1024;
let chunk_size = 64 * 1024;
let len =
calculate_signed_encoded_length(content_len, chunk_size, ChecksumAlgorithm::SHA256);
// Should include all overhead
assert!(len > content_len);
// Calculate expected: signature overhead per chunk is 81 bytes
// Plus final chunk (84), checksum trailer, trailer signature (92)
}
// ===========================
// RechunkingStream Tests
// ===========================
#[tokio::test]
async fn test_rechunking_stream_combines_small_chunks() {
// Create many small 1KB chunks
let chunk_size = 1024;
let num_chunks = 10;
let chunks: Vec<Result<Bytes, std::io::Error>> = (0..num_chunks)
.map(|i| Ok(Bytes::from(vec![i as u8; chunk_size])))
.collect();
let stream = futures_util::stream::iter(chunks);
let mut rechunker = RechunkingStream::new(stream, 4096); // 4KB target
let mut output_chunks = Vec::new();
while let Some(chunk) = rechunker.next().await {
output_chunks.push(chunk.unwrap());
}
// 10 x 1KB = 10KB, rechunked to 4KB = 2 full + 1 partial (2KB)
assert_eq!(output_chunks.len(), 3);
assert_eq!(output_chunks[0].len(), 4096);
assert_eq!(output_chunks[1].len(), 4096);
assert_eq!(output_chunks[2].len(), 2048);
// Total bytes preserved
let total: usize = output_chunks.iter().map(|c| c.len()).sum();
assert_eq!(total, num_chunks * chunk_size);
}
#[tokio::test]
async fn test_rechunking_stream_passes_large_chunks() {
// Single chunk larger than target size
let data = Bytes::from(vec![42u8; 10000]);
let stream = futures_util::stream::iter(vec![Ok::<_, std::io::Error>(data)]);
let mut rechunker = RechunkingStream::new(stream, 4096);
let mut output_chunks = Vec::new();
while let Some(chunk) = rechunker.next().await {
output_chunks.push(chunk.unwrap());
}
// 10000 bytes / 4096 = 2 full + 1 partial (1808)
assert_eq!(output_chunks.len(), 3);
assert_eq!(output_chunks[0].len(), 4096);
assert_eq!(output_chunks[1].len(), 4096);
assert_eq!(output_chunks[2].len(), 1808);
}
#[tokio::test]
async fn test_rechunking_stream_exact_multiple() {
// Data that divides evenly into chunk size
let data = Bytes::from(vec![1u8; 8192]);
let stream = futures_util::stream::iter(vec![Ok::<_, std::io::Error>(data)]);
let mut rechunker = RechunkingStream::new(stream, 4096);
let mut output_chunks = Vec::new();
while let Some(chunk) = rechunker.next().await {
output_chunks.push(chunk.unwrap());
}
assert_eq!(output_chunks.len(), 2);
assert_eq!(output_chunks[0].len(), 4096);
assert_eq!(output_chunks[1].len(), 4096);
}
#[tokio::test]
async fn test_rechunking_stream_empty() {
let stream = futures_util::stream::iter(Vec::<Result<Bytes, std::io::Error>>::new());
let mut rechunker = RechunkingStream::new(stream, 4096);
let result = rechunker.next().await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_rechunking_stream_preserves_data() {
// Verify data integrity through rechunking
let original: Vec<u8> = (0..=255).cycle().take(15000).collect();
let chunks: Vec<Result<Bytes, std::io::Error>> = original
.chunks(100) // 100-byte input chunks
.map(|c| Ok(Bytes::copy_from_slice(c)))
.collect();
let stream = futures_util::stream::iter(chunks);
let mut rechunker = RechunkingStream::new(stream, 4096);
let mut output = Vec::new();
while let Some(chunk) = rechunker.next().await {
output.extend_from_slice(&chunk.unwrap());
}
assert_eq!(output, original);
}
}

View File

@ -26,7 +26,9 @@ use crate::s3::response_traits::HasObjectSize;
use crate::s3::segmented_bytes::SegmentedBytes; use crate::s3::segmented_bytes::SegmentedBytes;
use crate::s3::sse::Sse; use crate::s3::sse::Sse;
use crate::s3::types::{S3Api, S3Request, ToS3Request}; use crate::s3::types::{S3Api, S3Request, ToS3Request};
use crate::s3::utils::{check_bucket_name, check_object_name, check_sse}; use crate::s3::utils::{
ChecksumAlgorithm, check_bucket_name, check_object_name, check_sse, compute_checksum_sb,
};
use http::Method; use http::Method;
use std::sync::Arc; use std::sync::Arc;
use typed_builder::TypedBuilder; use typed_builder::TypedBuilder;
@ -64,6 +66,14 @@ pub struct AppendObject {
/// Value of `x-amz-write-offset-bytes`. /// Value of `x-amz-write-offset-bytes`.
#[builder(!default)] // force required #[builder(!default)] // force required
offset_bytes: u64, offset_bytes: u64,
/// Optional checksum algorithm for data integrity verification during append.
///
/// When specified, computes a checksum of the appended data using the selected algorithm
/// (CRC32, CRC32C, SHA1, SHA256, or CRC64NVME). The checksum is sent with the append
/// operation and verified by the server.
#[builder(default, setter(into))]
checksum_algorithm: Option<ChecksumAlgorithm>,
} }
impl S3Api for AppendObject { impl S3Api for AppendObject {
@ -83,6 +93,7 @@ pub type AppendObjectBldr = AppendObjectBuilder<(
(), (),
(Arc<SegmentedBytes>,), (Arc<SegmentedBytes>,),
(u64,), (u64,),
(),
)>; )>;
impl ToS3Request for AppendObject { impl ToS3Request for AppendObject {
@ -94,6 +105,21 @@ impl ToS3Request for AppendObject {
let mut headers: Multimap = self.extra_headers.unwrap_or_default(); let mut headers: Multimap = self.extra_headers.unwrap_or_default();
headers.add(X_AMZ_WRITE_OFFSET_BYTES, self.offset_bytes.to_string()); headers.add(X_AMZ_WRITE_OFFSET_BYTES, self.offset_bytes.to_string());
if let Some(algorithm) = self.checksum_algorithm {
let checksum_value = compute_checksum_sb(algorithm, &self.data);
headers.add(X_AMZ_CHECKSUM_ALGORITHM, algorithm.as_str().to_string());
match algorithm {
ChecksumAlgorithm::CRC32 => headers.add(X_AMZ_CHECKSUM_CRC32, checksum_value),
ChecksumAlgorithm::CRC32C => headers.add(X_AMZ_CHECKSUM_CRC32C, checksum_value),
ChecksumAlgorithm::SHA1 => headers.add(X_AMZ_CHECKSUM_SHA1, checksum_value),
ChecksumAlgorithm::SHA256 => headers.add(X_AMZ_CHECKSUM_SHA256, checksum_value),
ChecksumAlgorithm::CRC64NVME => {
headers.add(X_AMZ_CHECKSUM_CRC64NVME, checksum_value)
}
}
}
Ok(S3Request::builder() Ok(S3Request::builder()
.client(self.client) .client(self.client)
.method(Method::PUT) .method(Method::PUT)
@ -144,6 +170,13 @@ pub struct AppendObjectContent {
/// Value of `x-amz-write-offset-bytes`. /// Value of `x-amz-write-offset-bytes`.
#[builder(default)] #[builder(default)]
offset_bytes: u64, offset_bytes: u64,
/// Optional checksum algorithm for data integrity verification during append.
///
/// When specified, computes checksums for appended data using the selected algorithm
/// (CRC32, CRC32C, SHA1, SHA256, or CRC64NVME). The checksum is computed for each
/// chunk and sent with the append operation.
#[builder(default, setter(into))]
checksum_algorithm: Option<ChecksumAlgorithm>,
} }
/// Builder type for [`AppendObjectContent`] that is returned by [`MinioClient::append_object_content`](crate::s3::client::MinioClient::append_object_content). /// Builder type for [`AppendObjectContent`] that is returned by [`MinioClient::append_object_content`](crate::s3::client::MinioClient::append_object_content).
@ -162,6 +195,7 @@ pub type AppendObjectContentBldr = AppendObjectContentBuilder<(
(), (),
(), (),
(), (),
(),
)>; )>;
impl AppendObjectContent { impl AppendObjectContent {
@ -229,6 +263,7 @@ impl AppendObjectContent {
offset_bytes: current_file_size, offset_bytes: current_file_size,
sse: self.sse, sse: self.sse,
data: Arc::new(seg_bytes), data: Arc::new(seg_bytes),
checksum_algorithm: self.checksum_algorithm,
}; };
ao.send().await ao.send().await
} else if let Some(expected) = object_size.value() } else if let Some(expected) = object_size.value()
@ -296,6 +331,7 @@ impl AppendObjectContent {
sse: self.sse.clone(), sse: self.sse.clone(),
data: Arc::new(part_content), data: Arc::new(part_content),
offset_bytes: next_offset_bytes, offset_bytes: next_offset_bytes,
checksum_algorithm: self.checksum_algorithm,
}; };
let resp: AppendObjectResponse = append_object.send().await?; let resp: AppendObjectResponse = append_object.send().await?;
//println!("AppendObjectResponse: object_size={:?}", resp.object_size); //println!("AppendObjectResponse: object_size={:?}", resp.object_size);

View File

@ -23,12 +23,13 @@ use crate::s3::response::{
CopyObjectInternalResponse, CopyObjectResponse, CreateMultipartUploadResponse, CopyObjectInternalResponse, CopyObjectResponse, CreateMultipartUploadResponse,
StatObjectResponse, UploadPartCopyResponse, StatObjectResponse, UploadPartCopyResponse,
}; };
use crate::s3::response_traits::HasChecksumHeaders;
use crate::s3::response_traits::HasEtagFromBody; use crate::s3::response_traits::HasEtagFromBody;
use crate::s3::sse::{Sse, SseCustomerKey}; use crate::s3::sse::{Sse, SseCustomerKey};
use crate::s3::types::{Directive, PartInfo, Retention, S3Api, S3Request, ToS3Request}; use crate::s3::types::{Directive, PartInfo, Retention, S3Api, S3Request, ToS3Request};
use crate::s3::utils::{ use crate::s3::utils::{
UtcTime, check_bucket_name, check_object_name, check_sse, check_ssec, to_http_header_value, ChecksumAlgorithm, UtcTime, check_bucket_name, check_object_name, check_sse, check_ssec,
to_iso8601utc, url_encode, to_http_header_value, to_iso8601utc, url_encode,
}; };
use async_recursion::async_recursion; use async_recursion::async_recursion;
use http::Method; use http::Method;
@ -59,6 +60,13 @@ pub struct UploadPartCopy {
part_number: u16, part_number: u16,
#[builder(default)] #[builder(default)]
headers: Multimap, headers: Multimap,
/// Optional checksum algorithm for data integrity verification during part copy.
///
/// When specified, the server computes a checksum of the copied part data using
/// this algorithm. Use the same algorithm for all parts in a multipart upload.
/// Supported algorithms: CRC32, CRC32C, SHA1, SHA256, CRC64NVME.
#[builder(default, setter(into))]
checksum_algorithm: Option<crate::s3::utils::ChecksumAlgorithm>,
} }
impl S3Api for UploadPartCopy { impl S3Api for UploadPartCopy {
@ -78,6 +86,7 @@ pub type UploadPartCopyBldr = UploadPartCopyBuilder<(
(String,), (String,),
(), (),
(), (),
(),
)>; )>;
impl ToS3Request for UploadPartCopy { impl ToS3Request for UploadPartCopy {
@ -100,6 +109,10 @@ impl ToS3Request for UploadPartCopy {
let mut headers: Multimap = self.extra_headers.unwrap_or_default(); let mut headers: Multimap = self.extra_headers.unwrap_or_default();
headers.add_multimap(self.headers); headers.add_multimap(self.headers);
if let Some(algorithm) = self.checksum_algorithm {
headers.add(X_AMZ_CHECKSUM_ALGORITHM, algorithm.as_str().to_string());
}
let mut query_params: Multimap = self.extra_query_params.unwrap_or_default(); let mut query_params: Multimap = self.extra_query_params.unwrap_or_default();
{ {
query_params.add("partNumber", self.part_number.to_string()); query_params.add("partNumber", self.part_number.to_string());
@ -150,6 +163,8 @@ pub struct CopyObjectInternal {
metadata_directive: Option<Directive>, metadata_directive: Option<Directive>,
#[builder(default, setter(into))] #[builder(default, setter(into))]
tagging_directive: Option<Directive>, tagging_directive: Option<Directive>,
#[builder(default, setter(into))]
checksum_algorithm: Option<crate::s3::utils::ChecksumAlgorithm>,
} }
impl S3Api for CopyObjectInternal { impl S3Api for CopyObjectInternal {
@ -175,6 +190,7 @@ pub type CopyObjectInternalBldr = CopyObjectInternalBuilder<(
(), (),
(), (),
(), (),
(),
)>; )>;
impl ToS3Request for CopyObjectInternal { impl ToS3Request for CopyObjectInternal {
@ -261,6 +277,10 @@ impl ToS3Request for CopyObjectInternal {
if let Some(v) = self.source.ssec { if let Some(v) = self.source.ssec {
headers.add_multimap(v.copy_headers()); headers.add_multimap(v.copy_headers());
} }
if let Some(algorithm) = self.checksum_algorithm {
headers.add(X_AMZ_CHECKSUM_ALGORITHM, algorithm.as_str().to_string());
}
}; };
Ok(S3Request::builder() Ok(S3Request::builder()
@ -310,6 +330,13 @@ pub struct CopyObject {
metadata_directive: Option<Directive>, metadata_directive: Option<Directive>,
#[builder(default, setter(into))] #[builder(default, setter(into))]
tagging_directive: Option<Directive>, tagging_directive: Option<Directive>,
/// Optional checksum algorithm for data integrity verification during copy.
///
/// When specified, the server computes a checksum of the destination object using
/// this algorithm during the copy operation. Supported algorithms: CRC32, CRC32C,
/// SHA1, SHA256, CRC64NVME. The checksum value is included in response headers for verification.
#[builder(default, setter(into))]
checksum_algorithm: Option<crate::s3::utils::ChecksumAlgorithm>,
} }
/// Builder type for [`CopyObject`] that is returned by [`MinioClient::copy_object`](crate::s3::client::MinioClient::copy_object). /// Builder type for [`CopyObject`] that is returned by [`MinioClient::copy_object`](crate::s3::client::MinioClient::copy_object).
@ -331,6 +358,7 @@ pub type CopyObjectBldr = CopyObjectBuilder<(
(), (),
(), (),
(), (),
(),
)>; )>;
impl CopyObject { impl CopyObject {
@ -434,6 +462,7 @@ impl CopyObject {
.source(self.source) .source(self.source)
.metadata_directive(self.metadata_directive) .metadata_directive(self.metadata_directive)
.tagging_directive(self.tagging_directive) .tagging_directive(self.tagging_directive)
.checksum_algorithm(self.checksum_algorithm)
.build() .build()
.send() .send()
.await?; .await?;
@ -472,6 +501,8 @@ pub struct ComposeObjectInternal {
legal_hold: bool, legal_hold: bool,
#[builder(default)] #[builder(default)]
sources: Vec<ComposeSource>, sources: Vec<ComposeSource>,
#[builder(default, setter(into))]
checksum_algorithm: Option<ChecksumAlgorithm>,
} }
/// Builder type for [`ComposeObjectInternal`] that is returned by `compose_object_internal` method. /// Builder type for [`ComposeObjectInternal`] that is returned by `compose_object_internal` method.
@ -491,6 +522,7 @@ pub type ComposeObjectInternalBldr = ComposeObjectInternalBuilder<(
(), (),
(), (),
(), (),
(),
)>; )>;
impl ComposeObjectInternal { impl ComposeObjectInternal {
@ -523,10 +555,10 @@ impl ComposeObjectInternal {
.legal_hold(self.legal_hold) .legal_hold(self.legal_hold)
.source( .source(
CopySource::builder() CopySource::builder()
.bucket(&self.bucket) .bucket(&sources[0].bucket)
.object(&self.object) .object(&sources[0].object)
.build(), .build(),
) // TODO redundant use of bucket and object )
.build() .build()
.send() .send()
.await .await
@ -554,6 +586,7 @@ impl ComposeObjectInternal {
.extra_query_params(self.extra_query_params.clone()) .extra_query_params(self.extra_query_params.clone())
.region(self.region.clone()) .region(self.region.clone())
.extra_headers(Some(headers)) .extra_headers(Some(headers))
.checksum_algorithm(self.checksum_algorithm)
.build() .build()
.send() .send()
.await .await
@ -612,6 +645,7 @@ impl ComposeObjectInternal {
.region(self.region.clone()) .region(self.region.clone())
.part_number(part_number) .part_number(part_number)
.headers(headers) .headers(headers)
.checksum_algorithm(self.checksum_algorithm)
.build() .build()
.send() .send()
.await .await
@ -625,11 +659,10 @@ impl ComposeObjectInternal {
Err(e) => return (Err(e.into()), upload_id), Err(e) => return (Err(e.into()), upload_id),
}; };
parts.push(PartInfo { let checksum = self
number: part_number, .checksum_algorithm
etag, .and_then(|alg| resp.get_checksum(alg).map(|v| (alg, v)));
size, parts.push(PartInfo::new(part_number, etag, size, checksum));
});
} else { } else {
let part_ranges = calculate_part_ranges(offset, size, MAX_PART_SIZE); let part_ranges = calculate_part_ranges(offset, size, MAX_PART_SIZE);
for (part_offset, length) in part_ranges { for (part_offset, length) in part_ranges {
@ -648,6 +681,7 @@ impl ComposeObjectInternal {
.region(self.region.clone()) .region(self.region.clone())
.part_number(part_number) .part_number(part_number)
.headers(headers_copy) .headers(headers_copy)
.checksum_algorithm(self.checksum_algorithm)
.build() .build()
.send() .send()
.await .await
@ -661,11 +695,10 @@ impl ComposeObjectInternal {
Err(e) => return (Err(e.into()), upload_id), Err(e) => return (Err(e.into()), upload_id),
}; };
parts.push(PartInfo { let checksum = self
number: part_number, .checksum_algorithm
etag, .and_then(|alg| resp.get_checksum(alg).map(|v| (alg, v)));
size: length, parts.push(PartInfo::new(part_number, etag, length, checksum));
});
} }
} }
} }
@ -725,6 +758,8 @@ pub struct ComposeObject {
legal_hold: bool, legal_hold: bool,
#[builder(default)] #[builder(default)]
sources: Vec<ComposeSource>, sources: Vec<ComposeSource>,
#[builder(default, setter(into))]
checksum_algorithm: Option<ChecksumAlgorithm>,
} }
/// Builder type for [`ComposeObject`] that is returned by [`MinioClient::compose_object`](crate::s3::client::MinioClient::compose_object). /// Builder type for [`ComposeObject`] that is returned by [`MinioClient::compose_object`](crate::s3::client::MinioClient::compose_object).
@ -744,6 +779,7 @@ pub type ComposeObjectBldr = ComposeObjectBuilder<(
(), (),
(), (),
(Vec<ComposeSource>,), (Vec<ComposeSource>,),
(),
)>; )>;
impl ComposeObject { impl ComposeObject {
@ -766,6 +802,7 @@ impl ComposeObject {
.retention(self.retention) .retention(self.retention)
.legal_hold(self.legal_hold) .legal_hold(self.legal_hold)
.sources(self.sources) .sources(self.sources)
.checksum_algorithm(self.checksum_algorithm)
.build() .build()
.send() .send()
.await; .await;

View File

@ -23,12 +23,14 @@ use crate::s3::response::{
AbortMultipartUploadResponse, CompleteMultipartUploadResponse, CreateMultipartUploadResponse, AbortMultipartUploadResponse, CompleteMultipartUploadResponse, CreateMultipartUploadResponse,
PutObjectContentResponse, PutObjectResponse, UploadPartResponse, PutObjectContentResponse, PutObjectResponse, UploadPartResponse,
}; };
use crate::s3::response_traits::HasEtagFromHeaders; use crate::s3::response_traits::{HasChecksumHeaders, HasEtagFromHeaders};
use crate::s3::segmented_bytes::SegmentedBytes; use crate::s3::segmented_bytes::SegmentedBytes;
use crate::s3::sse::Sse; use crate::s3::sse::Sse;
use crate::s3::types::{PartInfo, Retention, S3Api, S3Request, ToS3Request}; use crate::s3::types::{PartInfo, Retention, S3Api, S3Request, ToS3Request};
use crate::s3::utils::{
ChecksumAlgorithm, check_object_name, check_sse, compute_checksum_sb, insert,
};
use crate::s3::utils::{check_bucket_name, md5sum_hash, to_iso8601utc, url_encode}; use crate::s3::utils::{check_bucket_name, md5sum_hash, to_iso8601utc, url_encode};
use crate::s3::utils::{check_object_name, check_sse, insert};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use http::Method; use http::Method;
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
@ -65,6 +67,13 @@ pub struct CreateMultipartUpload {
legal_hold: bool, legal_hold: bool,
#[builder(default, setter(into))] #[builder(default, setter(into))]
content_type: Option<String>, content_type: Option<String>,
/// Optional checksum algorithm to use for data integrity verification.
///
/// When specified, the server will compute checksums for each uploaded part
/// using this algorithm. Supported algorithms: CRC32, CRC32C, SHA1, SHA256, CRC64NVME.
/// The checksum is included in response headers for verification.
#[builder(default, setter(into))]
checksum_algorithm: Option<ChecksumAlgorithm>,
} }
/// Builder type for [`CreateMultipartUpload`] that is returned by [`MinioClient::create_multipart_upload`](crate::s3::client::MinioClient::create_multipart_upload). /// Builder type for [`CreateMultipartUpload`] that is returned by [`MinioClient::create_multipart_upload`](crate::s3::client::MinioClient::create_multipart_upload).
@ -83,6 +92,7 @@ pub type CreateMultipartUploadBldr = CreateMultipartUploadBuilder<(
(), (),
(), (),
(), (),
(),
)>; )>;
impl S3Api for CreateMultipartUpload { impl S3Api for CreateMultipartUpload {
@ -94,7 +104,7 @@ impl ToS3Request for CreateMultipartUpload {
check_bucket_name(&self.bucket, true)?; check_bucket_name(&self.bucket, true)?;
check_object_name(&self.object)?; check_object_name(&self.object)?;
let headers: Multimap = into_headers_put_object( let mut headers: Multimap = into_headers_put_object(
self.extra_headers, self.extra_headers,
self.user_metadata, self.user_metadata,
self.sse, self.sse,
@ -104,6 +114,10 @@ impl ToS3Request for CreateMultipartUpload {
self.content_type, self.content_type,
)?; )?;
if let Some(algorithm) = self.checksum_algorithm {
headers.add(X_AMZ_CHECKSUM_ALGORITHM, algorithm.as_str().to_string());
}
Ok(S3Request::builder() Ok(S3Request::builder()
.client(self.client) .client(self.client)
.method(Method::POST) .method(Method::POST)
@ -197,6 +211,14 @@ pub struct CompleteMultipartUpload {
upload_id: String, upload_id: String,
#[builder(!default)] // force required #[builder(!default)] // force required
parts: Vec<PartInfo>, parts: Vec<PartInfo>,
/// Optional checksum algorithm used during multipart upload.
///
/// When specified and all parts were uploaded with the same checksum algorithm,
/// the server will compute a composite checksum (checksum-of-checksums) for the
/// entire object. This must match the algorithm used in CreateMultipartUpload
/// and all UploadPart operations. Supported algorithms: CRC32, CRC32C, SHA1, SHA256, CRC64NVME.
#[builder(default, setter(into))]
checksum_algorithm: Option<ChecksumAlgorithm>,
} }
/// Builder type for [`CompleteMultipartUpload`] that is returned by [`MinioClient::complete_multipart_upload`](crate::s3::client::MinioClient::complete_multipart_upload). /// Builder type for [`CompleteMultipartUpload`] that is returned by [`MinioClient::complete_multipart_upload`](crate::s3::client::MinioClient::complete_multipart_upload).
@ -211,6 +233,7 @@ pub type CompleteMultipartUploadBldr = CompleteMultipartUploadBuilder<(
(String,), (String,),
(String,), (String,),
(Vec<PartInfo>,), (Vec<PartInfo>,),
(),
)>; )>;
impl S3Api for CompleteMultipartUpload { impl S3Api for CompleteMultipartUpload {
@ -235,14 +258,37 @@ impl ToS3Request for CompleteMultipartUpload {
// Set the capacity of the byte-buffer based on the part count - attempting // Set the capacity of the byte-buffer based on the part count - attempting
// to avoid extra allocations when building the XML payload. // to avoid extra allocations when building the XML payload.
let bytes: Bytes = { let bytes: Bytes = {
let mut data = BytesMut::with_capacity(100 * self.parts.len() + 100); let mut data = BytesMut::with_capacity(200 * self.parts.len() + 100);
data.extend_from_slice(b"<CompleteMultipartUpload>"); data.extend_from_slice(b"<CompleteMultipartUpload>");
for part in self.parts.iter() { for part in self.parts.iter() {
data.extend_from_slice(b"<Part><PartNumber>"); data.extend_from_slice(b"<Part><PartNumber>");
data.extend_from_slice(part.number.to_string().as_bytes()); data.extend_from_slice(part.number.to_string().as_bytes());
data.extend_from_slice(b"</PartNumber><ETag>"); data.extend_from_slice(b"</PartNumber><ETag>");
data.extend_from_slice(part.etag.as_bytes()); data.extend_from_slice(part.etag.as_bytes());
data.extend_from_slice(b"</ETag></Part>"); data.extend_from_slice(b"</ETag>");
if let Some((algorithm, ref value)) = part.checksum {
let (open_tag, close_tag) = match algorithm {
ChecksumAlgorithm::CRC32 => {
(&b"<ChecksumCRC32>"[..], &b"</ChecksumCRC32>"[..])
}
ChecksumAlgorithm::CRC32C => {
(&b"<ChecksumCRC32C>"[..], &b"</ChecksumCRC32C>"[..])
}
ChecksumAlgorithm::SHA1 => {
(&b"<ChecksumSHA1>"[..], &b"</ChecksumSHA1>"[..])
}
ChecksumAlgorithm::SHA256 => {
(&b"<ChecksumSHA256>"[..], &b"</ChecksumSHA256>"[..])
}
ChecksumAlgorithm::CRC64NVME => {
(&b"<ChecksumCRC64NVME>"[..], &b"</ChecksumCRC64NVME>"[..])
}
};
data.extend_from_slice(open_tag);
data.extend_from_slice(value.as_bytes());
data.extend_from_slice(close_tag);
}
data.extend_from_slice(b"</Part>");
} }
data.extend_from_slice(b"</CompleteMultipartUpload>"); data.extend_from_slice(b"</CompleteMultipartUpload>");
data.freeze() data.freeze()
@ -252,6 +298,10 @@ impl ToS3Request for CompleteMultipartUpload {
{ {
headers.add(CONTENT_TYPE, "application/xml"); headers.add(CONTENT_TYPE, "application/xml");
headers.add(CONTENT_MD5, md5sum_hash(bytes.as_ref())); headers.add(CONTENT_MD5, md5sum_hash(bytes.as_ref()));
if let Some(algorithm) = self.checksum_algorithm {
headers.add(X_AMZ_CHECKSUM_ALGORITHM, algorithm.as_str().to_string());
}
} }
let mut query_params: Multimap = self.extra_query_params.unwrap_or_default(); let mut query_params: Multimap = self.extra_query_params.unwrap_or_default();
query_params.add("uploadId", self.upload_id); query_params.add("uploadId", self.upload_id);
@ -311,6 +361,35 @@ pub struct UploadPart {
upload_id: Option<String>, upload_id: Option<String>,
#[builder(default, setter(into))] // force required #[builder(default, setter(into))] // force required
part_number: Option<u16>, part_number: Option<u16>,
/// Optional checksum algorithm to use for this part's data integrity verification.
///
/// When specified, computes a checksum of the part data using the selected algorithm
/// (CRC32, CRC32C, SHA1, SHA256, or CRC64NVME). The checksum is sent with the upload
/// and verified by the server. For multipart uploads, use the same algorithm for all parts.
#[builder(default, setter(into))]
checksum_algorithm: Option<ChecksumAlgorithm>,
/// When true and `checksum_algorithm` is set, uses trailing checksums.
///
/// Trailing checksums use aws-chunked encoding where the checksum is computed
/// incrementally while streaming and appended at the end of the request body.
/// This avoids buffering the entire content to compute the checksum upfront.
///
/// Defaults to false for backwards compatibility.
#[builder(default = false)]
use_trailing_checksum: bool,
/// When true, signs each chunk with AWS Signature V4 for streaming uploads.
///
/// Requires `use_trailing_checksum` to be true and `checksum_algorithm` to be set.
/// Uses STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER where each chunk is signed
/// and the trailer includes a trailer signature. This provides additional
/// integrity verification at the protocol level.
///
/// Defaults to false for backwards compatibility.
#[builder(default = false)]
use_signed_streaming: bool,
} }
/// Builder type for [`UploadPart`] that is returned by [`MinioClient::upload_part`](crate::s3::client::MinioClient::upload_part). /// Builder type for [`UploadPart`] that is returned by [`MinioClient::upload_part`](crate::s3::client::MinioClient::upload_part).
@ -332,6 +411,9 @@ pub type UploadPartBldr = UploadPartBuilder<(
(), (),
(Option<String>,), (Option<String>,),
(Option<u16>,), (Option<u16>,),
(),
(),
(),
)>; )>;
impl S3Api for UploadPart { impl S3Api for UploadPart {
@ -360,7 +442,7 @@ impl ToS3Request for UploadPart {
} }
} }
let headers: Multimap = into_headers_put_object( let mut headers: Multimap = into_headers_put_object(
self.extra_headers, self.extra_headers,
self.user_metadata, self.user_metadata,
self.sse, self.sse,
@ -370,6 +452,31 @@ impl ToS3Request for UploadPart {
self.content_type, self.content_type,
)?; )?;
// Determine if we're using trailing checksums
let trailing_checksum = if self.use_trailing_checksum && self.checksum_algorithm.is_some() {
self.checksum_algorithm
} else {
None
};
// For upfront checksums (not trailing), compute and add to headers
if let Some(algorithm) = self.checksum_algorithm
&& !self.use_trailing_checksum
{
let checksum_value = compute_checksum_sb(algorithm, &self.data);
headers.add(X_AMZ_CHECKSUM_ALGORITHM, algorithm.as_str().to_string());
match algorithm {
ChecksumAlgorithm::CRC32 => headers.add(X_AMZ_CHECKSUM_CRC32, checksum_value),
ChecksumAlgorithm::CRC32C => headers.add(X_AMZ_CHECKSUM_CRC32C, checksum_value),
ChecksumAlgorithm::SHA1 => headers.add(X_AMZ_CHECKSUM_SHA1, checksum_value),
ChecksumAlgorithm::SHA256 => headers.add(X_AMZ_CHECKSUM_SHA256, checksum_value),
ChecksumAlgorithm::CRC64NVME => {
headers.add(X_AMZ_CHECKSUM_CRC64NVME, checksum_value)
}
}
}
let mut query_params: Multimap = self.extra_query_params.unwrap_or_default(); let mut query_params: Multimap = self.extra_query_params.unwrap_or_default();
if let Some(upload_id) = self.upload_id { if let Some(upload_id) = self.upload_id {
@ -388,6 +495,8 @@ impl ToS3Request for UploadPart {
.object(self.object) .object(self.object)
.headers(headers) .headers(headers)
.body(self.data) .body(self.data)
.trailing_checksum(trailing_checksum)
.use_signed_streaming(self.use_signed_streaming)
.build()) .build())
} }
} }
@ -454,6 +563,29 @@ pub struct PutObjectContent {
part_size: Size, part_size: Size,
#[builder(default, setter(into))] #[builder(default, setter(into))]
content_type: Option<String>, content_type: Option<String>,
#[builder(default, setter(into))]
checksum_algorithm: Option<ChecksumAlgorithm>,
/// When true and `checksum_algorithm` is set, uses trailing checksums.
///
/// Trailing checksums use aws-chunked encoding where the checksum is computed
/// incrementally while streaming and appended at the end of the request body.
/// This avoids buffering the entire content to compute the checksum upfront.
///
/// Defaults to false for backwards compatibility.
#[builder(default = false)]
use_trailing_checksum: bool,
/// When true, signs each chunk with AWS Signature V4 for streaming uploads.
///
/// Requires `use_trailing_checksum` to be true and `checksum_algorithm` to be set.
/// Uses STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER where each chunk is signed
/// and the trailer includes a trailer signature. This provides additional
/// integrity verification at the protocol level.
///
/// Defaults to false for backwards compatibility.
#[builder(default = false)]
use_signed_streaming: bool,
// source data // source data
#[builder(!default, setter(into))] // force required + accept Into<String> #[builder(!default, setter(into))] // force required + accept Into<String>
@ -484,6 +616,9 @@ pub type PutObjectContentBldr = PutObjectContentBuilder<(
(), (),
(), (),
(), (),
(),
(),
(),
(ObjectContent,), (ObjectContent,),
)>; )>;
@ -542,6 +677,9 @@ impl PutObjectContent {
upload_id: None, upload_id: None,
data: Arc::new(seg_bytes), data: Arc::new(seg_bytes),
content_type: self.content_type.clone(), content_type: self.content_type.clone(),
checksum_algorithm: self.checksum_algorithm,
use_trailing_checksum: self.use_trailing_checksum,
use_signed_streaming: self.use_signed_streaming,
}) })
.build() .build()
.send() .send()
@ -572,6 +710,7 @@ impl PutObjectContent {
.retention(self.retention.clone()) .retention(self.retention.clone())
.legal_hold(self.legal_hold) .legal_hold(self.legal_hold)
.content_type(self.content_type.clone()) .content_type(self.content_type.clone())
.checksum_algorithm(self.checksum_algorithm)
.build() .build()
.send() .send()
.await?; .await?;
@ -667,15 +806,22 @@ impl PutObjectContent {
upload_id: Some(upload_id.to_string()), upload_id: Some(upload_id.to_string()),
data: Arc::new(part_content), data: Arc::new(part_content),
content_type: self.content_type.clone(), content_type: self.content_type.clone(),
checksum_algorithm: self.checksum_algorithm,
use_trailing_checksum: self.use_trailing_checksum,
use_signed_streaming: self.use_signed_streaming,
} }
.send() .send()
.await?; .await?;
parts.push(PartInfo { let checksum = self
number: part_number, .checksum_algorithm
etag: resp.etag()?, .and_then(|alg| resp.get_checksum(alg).map(|v| (alg, v)));
size: buffer_size, parts.push(PartInfo::new(
}); part_number,
resp.etag()?,
buffer_size,
checksum,
));
// Finally, check if we are done. // Finally, check if we are done.
if buffer_size < part_size { if buffer_size < part_size {
@ -705,6 +851,7 @@ impl PutObjectContent {
region: self.region, region: self.region,
parts, parts,
upload_id, upload_id,
checksum_algorithm: self.checksum_algorithm,
} }
.send() .send()
.await?; .await?;

View File

@ -57,8 +57,10 @@ use crate::s3::multimap_ext::{Multimap, MultimapExt};
use crate::s3::response::*; use crate::s3::response::*;
use crate::s3::response_traits::{HasEtagFromHeaders, HasS3Fields}; use crate::s3::response_traits::{HasEtagFromHeaders, HasS3Fields};
use crate::s3::segmented_bytes::SegmentedBytes; use crate::s3::segmented_bytes::SegmentedBytes;
use crate::s3::signer::{SigningKeyCache, sign_v4_s3}; use crate::s3::signer::{SigningKeyCache, sign_v4_s3, sign_v4_s3_with_context};
use crate::s3::utils::{EMPTY_SHA256, check_ssec_with_log, sha256_hash_sb, to_amz_date, utc_now}; use crate::s3::utils::{
ChecksumAlgorithm, EMPTY_SHA256, check_ssec_with_log, sha256_hash_sb, to_amz_date, utc_now,
};
mod append_object; mod append_object;
mod bucket_exists; mod bucket_exists;
@ -663,8 +665,14 @@ impl MinioClient {
bucket_name: Option<&str>, bucket_name: Option<&str>,
object_name: Option<&str>, object_name: Option<&str>,
body: Option<Arc<SegmentedBytes>>, body: Option<Arc<SegmentedBytes>>,
trailing_checksum: Option<ChecksumAlgorithm>,
use_signed_streaming: bool,
retry: bool, retry: bool,
) -> Result<reqwest::Response, Error> { ) -> Result<reqwest::Response, Error> {
use crate::s3::aws_chunked::{
AwsChunkedEncoder, RechunkingStream, SignedAwsChunkedEncoder,
};
let mut url = self.shared.base_url.build_url( let mut url = self.shared.base_url.build_url(
method, method,
region, region,
@ -675,14 +683,55 @@ impl MinioClient {
let mut extensions = http::Extensions::default(); let mut extensions = http::Extensions::default();
headers.add(HOST, url.host_header_value()); headers.add(HOST, url.host_header_value());
// Determine if we're using trailing checksums (signed or unsigned)
let use_trailing = trailing_checksum.is_some()
&& matches!(*method, Method::PUT | Method::POST)
&& body.is_some();
let use_signed_trailing = use_trailing && use_signed_streaming;
let sha256: String = match *method { let sha256: String = match *method {
Method::PUT | Method::POST => { Method::PUT | Method::POST => {
if !headers.contains_key(CONTENT_TYPE) { if !headers.contains_key(CONTENT_TYPE) {
// Empty body with Content-Type can cause some MinIO versions to expect XML // Empty body with Content-Type can cause some MinIO versions to expect XML
headers.add(CONTENT_TYPE, "application/octet-stream"); headers.add(CONTENT_TYPE, "application/octet-stream");
} }
let len: usize = body.as_ref().map_or(0, |b| b.len()); let raw_len: usize = body.as_ref().map_or(0, |b| b.len());
headers.add(CONTENT_LENGTH, len.to_string());
if use_trailing {
// For trailing checksums, use aws-chunked encoding
let algorithm = trailing_checksum.unwrap();
// Set headers for aws-chunked encoding
headers.add(CONTENT_ENCODING, "aws-chunked");
headers.add(X_AMZ_DECODED_CONTENT_LENGTH, raw_len.to_string());
headers.add(X_AMZ_TRAILER, algorithm.header_name());
// Calculate the encoded length for Content-Length
let encoded_len = if use_signed_trailing {
crate::s3::aws_chunked::calculate_signed_encoded_length(
raw_len as u64,
crate::s3::aws_chunked::default_chunk_size(),
algorithm,
)
} else {
crate::s3::aws_chunked::calculate_encoded_length(
raw_len as u64,
crate::s3::aws_chunked::default_chunk_size(),
algorithm,
)
};
headers.add(CONTENT_LENGTH, encoded_len.to_string());
// Use appropriate Content-SHA256 value
if use_signed_trailing {
STREAMING_AWS4_HMAC_SHA256_PAYLOAD_TRAILER.into()
} else {
STREAMING_UNSIGNED_PAYLOAD_TRAILER.into()
}
} else {
// Standard upfront checksum
headers.add(CONTENT_LENGTH, raw_len.to_string());
match body { match body {
None => EMPTY_SHA256.into(), None => EMPTY_SHA256.into(),
Some(ref v) => { Some(ref v) => {
@ -691,6 +740,7 @@ impl MinioClient {
} }
} }
} }
}
_ => EMPTY_SHA256.into(), _ => EMPTY_SHA256.into(),
}; };
headers.add(X_AMZ_CONTENT_SHA256, sha256.clone()); headers.add(X_AMZ_CONTENT_SHA256, sha256.clone());
@ -723,11 +773,29 @@ impl MinioClient {
headers.add("x-minio-redirect-to", url.to_string()); headers.add("x-minio-redirect-to", url.to_string());
} }
if let Some(p) = &self.shared.provider { // For signed streaming, we need the signing context for chunk signatures
let chunk_signing_context = if let Some(p) = &self.shared.provider {
let creds = p.fetch(); let creds = p.fetch();
if creds.session_token.is_some() { if creds.session_token.is_some() {
headers.add(X_AMZ_SECURITY_TOKEN, creds.session_token.unwrap()); headers.add(X_AMZ_SECURITY_TOKEN, creds.session_token.unwrap());
} }
if use_signed_trailing {
// Use the version that returns chunk signing context
Some(sign_v4_s3_with_context(
&self.shared.signing_key_cache,
method,
&url.path,
region,
headers,
query_params,
&creds.access_key,
&creds.secret_key,
&sha256,
date,
))
} else {
// Standard signing without context
sign_v4_s3( sign_v4_s3(
&self.shared.signing_key_cache, &self.shared.signing_key_cache,
method, method,
@ -740,7 +808,11 @@ impl MinioClient {
&sha256, &sha256,
date, date,
); );
None
} }
} else {
None
};
let mut req = self.http_client.request(method.clone(), url.to_string()); let mut req = self.http_client.request(method.clone(), url.to_string());
@ -766,8 +838,28 @@ impl MinioClient {
None => BodyIterator::Empty(std::iter::empty()), None => BodyIterator::Empty(std::iter::empty()),
}; };
let stream = futures_util::stream::iter(iter.map(|b| -> Result<_, Error> { Ok(b) })); let stream = futures_util::stream::iter(iter.map(|b| -> Result<_, Error> { Ok(b) }));
if use_signed_trailing {
// Wrap stream with signed aws-chunked encoder for trailing checksum.
// Re-chunk the stream to match the chunk size used in Content-Length calculation,
// ensuring the actual encoded bytes match the declared Content-Length.
let algorithm = trailing_checksum.unwrap();
let context =
chunk_signing_context.expect("signing context required for signed streaming");
let rechunked = RechunkingStream::with_default_chunk_size(stream);
let encoder = SignedAwsChunkedEncoder::new(rechunked, algorithm, context);
req = req.body(Body::wrap_stream(encoder));
} else if use_trailing {
// Wrap stream with unsigned aws-chunked encoder for trailing checksum.
// Re-chunk the stream to match the chunk size used in Content-Length calculation.
let algorithm = trailing_checksum.unwrap();
let rechunked = RechunkingStream::with_default_chunk_size(stream);
let encoder = AwsChunkedEncoder::new(rechunked, algorithm);
req = req.body(Body::wrap_stream(encoder));
} else {
req = req.body(Body::wrap_stream(stream)); req = req.body(Body::wrap_stream(stream));
} }
}
let resp = req.send().await; let resp = req.send().await;
@ -825,6 +917,8 @@ impl MinioClient {
bucket_name: &Option<&str>, bucket_name: &Option<&str>,
object_name: &Option<&str>, object_name: &Option<&str>,
data: Option<Arc<SegmentedBytes>>, data: Option<Arc<SegmentedBytes>>,
trailing_checksum: Option<ChecksumAlgorithm>,
use_signed_streaming: bool,
) -> Result<reqwest::Response, Error> { ) -> Result<reqwest::Response, Error> {
let resp: Result<reqwest::Response, Error> = self let resp: Result<reqwest::Response, Error> = self
.execute_internal( .execute_internal(
@ -835,6 +929,8 @@ impl MinioClient {
bucket_name.as_deref(), bucket_name.as_deref(),
object_name.as_deref(), object_name.as_deref(),
data.as_ref().map(Arc::clone), data.as_ref().map(Arc::clone),
trailing_checksum,
use_signed_streaming,
true, true,
) )
.await; .await;
@ -859,6 +955,8 @@ impl MinioClient {
bucket_name.as_deref(), bucket_name.as_deref(),
object_name.as_deref(), object_name.as_deref(),
data, data,
trailing_checksum,
use_signed_streaming,
false, false,
) )
.await .await

View File

@ -148,6 +148,9 @@ pub enum ValidationErr {
got: u32, got: u32,
}, },
#[error("Checksum mismatch; expected: {expected}, computed: {computed}")]
ChecksumMismatch { expected: String, computed: String },
#[error("Unknown event type: {0}")] #[error("Unknown event type: {0}")]
UnknownEventType(String), UnknownEventType(String),

View File

@ -15,6 +15,7 @@
//! Implementation of Simple Storage Service (aka S3) client //! Implementation of Simple Storage Service (aka S3) client
pub mod aws_chunked;
pub mod builders; pub mod builders;
pub mod client; pub mod client;
pub mod creds; pub mod creds;

View File

@ -14,7 +14,8 @@
// limitations under the License. // limitations under the License.
use crate::s3::response_traits::{ use crate::s3::response_traits::{
HasBucket, HasEtagFromHeaders, HasObject, HasObjectSize, HasRegion, HasVersion, HasBucket, HasChecksumHeaders, HasEtagFromHeaders, HasObject, HasObjectSize, HasRegion,
HasVersion,
}; };
use crate::s3::types::S3Request; use crate::s3::types::S3Request;
use crate::{impl_from_s3response, impl_has_s3fields}; use crate::{impl_from_s3response, impl_has_s3fields};
@ -40,3 +41,4 @@ impl HasRegion for AppendObjectResponse {}
impl HasVersion for AppendObjectResponse {} impl HasVersion for AppendObjectResponse {}
impl HasEtagFromHeaders for AppendObjectResponse {} impl HasEtagFromHeaders for AppendObjectResponse {}
impl HasObjectSize for AppendObjectResponse {} impl HasObjectSize for AppendObjectResponse {}
impl HasChecksumHeaders for AppendObjectResponse {}

View File

@ -13,7 +13,9 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use crate::s3::response_traits::{HasBucket, HasEtagFromBody, HasObject, HasRegion, HasVersion}; use crate::s3::response_traits::{
HasBucket, HasChecksumHeaders, HasEtagFromBody, HasObject, HasRegion, HasVersion,
};
use crate::s3::types::S3Request; use crate::s3::types::S3Request;
use crate::{impl_from_s3response, impl_has_s3fields}; use crate::{impl_from_s3response, impl_has_s3fields};
use bytes::Bytes; use bytes::Bytes;
@ -35,6 +37,7 @@ impl HasObject for S3Response2 {}
impl HasRegion for S3Response2 {} impl HasRegion for S3Response2 {}
impl HasVersion for S3Response2 {} impl HasVersion for S3Response2 {}
impl HasEtagFromBody for S3Response2 {} impl HasEtagFromBody for S3Response2 {}
impl HasChecksumHeaders for S3Response2 {}
/// Represents the response of the `upload_part_copy` API call. /// Represents the response of the `upload_part_copy` API call.
/// This struct contains metadata and information about the part being copied during a multipart upload. /// This struct contains metadata and information about the part being copied during a multipart upload.

View File

@ -16,12 +16,22 @@
use crate::impl_has_s3fields; use crate::impl_has_s3fields;
use crate::s3::builders::ObjectContent; use crate::s3::builders::ObjectContent;
use crate::s3::error::{Error, ValidationErr}; use crate::s3::error::{Error, ValidationErr};
use crate::s3::response_traits::{HasBucket, HasEtagFromHeaders, HasObject, HasRegion, HasVersion}; use crate::s3::response_traits::{
HasBucket, HasChecksumHeaders, HasEtagFromHeaders, HasObject, HasRegion, HasVersion,
};
use crate::s3::types::{FromS3Response, S3Request}; use crate::s3::types::{FromS3Response, S3Request};
use crate::s3::utils::{ChecksumAlgorithm, b64_encode, compute_checksum};
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use futures_util::TryStreamExt; use crc_fast::{CrcAlgorithm, Digest as CrcFastDigest};
use futures_util::{Stream, TryStreamExt};
use http::HeaderMap; use http::HeaderMap;
#[cfg(feature = "ring")]
use ring::digest::{Context, SHA256};
use sha1::{Digest as Sha1Digest, Sha1};
#[cfg(not(feature = "ring"))]
use sha2::Sha256;
use std::io;
use std::mem; use std::mem;
use std::pin::Pin; use std::pin::Pin;
@ -30,12 +40,174 @@ pub type BoxedByteStream = (
Pin<Box<dyn futures_util::Stream<Item = std::io::Result<Bytes>> + Send>>, Pin<Box<dyn futures_util::Stream<Item = std::io::Result<Bytes>> + Send>>,
u64, u64,
); );
use std::task::{Context as TaskContext, Poll};
/// Stateful checksum hasher for streaming verification.
///
/// This enum provides incremental checksum computation across multiple data chunks,
/// enabling efficient verification of large objects without loading them entirely into memory.
/// Each variant wraps the appropriate hasher implementation for its algorithm.
///
/// The hasher is used internally by [`GetObjectResponse::content()`] to verify checksums
/// transparently during streaming, with minimal performance overhead.
enum ChecksumHasher {
Crc32(CrcFastDigest),
Crc32c(CrcFastDigest),
Crc64nvme(CrcFastDigest),
Sha1(Sha1),
#[cfg(feature = "ring")]
Sha256(Context),
#[cfg(not(feature = "ring"))]
Sha256(Sha256),
}
impl ChecksumHasher {
/// Creates a new checksum hasher for the specified algorithm.
///
/// Initializes the appropriate hasher implementation with cached instances
/// for CRC variants to optimize performance.
///
/// # Arguments
///
/// * `algorithm` - The checksum algorithm to use for verification
fn new(algorithm: ChecksumAlgorithm) -> Self {
match algorithm {
ChecksumAlgorithm::CRC32 => {
ChecksumHasher::Crc32(CrcFastDigest::new(CrcAlgorithm::Crc32IsoHdlc))
}
ChecksumAlgorithm::CRC32C => {
ChecksumHasher::Crc32c(CrcFastDigest::new(CrcAlgorithm::Crc32Iscsi))
}
ChecksumAlgorithm::CRC64NVME => {
ChecksumHasher::Crc64nvme(CrcFastDigest::new(CrcAlgorithm::Crc64Nvme))
}
ChecksumAlgorithm::SHA1 => ChecksumHasher::Sha1(Sha1::new()),
#[cfg(feature = "ring")]
ChecksumAlgorithm::SHA256 => ChecksumHasher::Sha256(Context::new(&SHA256)),
#[cfg(not(feature = "ring"))]
ChecksumAlgorithm::SHA256 => ChecksumHasher::Sha256(Sha256::new()),
}
}
/// Updates the checksum computation with a new chunk of data.
///
/// This method is called incrementally as data streams through, allowing
/// verification without buffering the entire object in memory.
///
/// # Arguments
///
/// * `data` - The next chunk of data to include in the checksum
fn update(&mut self, data: &[u8]) {
match self {
ChecksumHasher::Crc32(digest) => digest.update(data),
ChecksumHasher::Crc32c(digest) => digest.update(data),
ChecksumHasher::Crc64nvme(digest) => digest.update(data),
ChecksumHasher::Sha1(hasher) => hasher.update(data),
#[cfg(feature = "ring")]
ChecksumHasher::Sha256(ctx) => ctx.update(data),
#[cfg(not(feature = "ring"))]
ChecksumHasher::Sha256(hasher) => hasher.update(data),
}
}
/// Completes the checksum computation and returns the base64-encoded result.
///
/// This consumes the hasher and produces the final checksum value in the format
/// expected by S3 headers (base64-encoded). The result can be compared directly
/// with the checksum value from response headers.
///
/// # Returns
///
/// Base64-encoded checksum string matching the S3 header format.
fn finalize(self) -> String {
match self {
// crc-fast returns u64 for all algorithms; CRC32 variants need cast to u32
ChecksumHasher::Crc32(digest) => b64_encode((digest.finalize() as u32).to_be_bytes()),
ChecksumHasher::Crc32c(digest) => b64_encode((digest.finalize() as u32).to_be_bytes()),
ChecksumHasher::Crc64nvme(digest) => b64_encode(digest.finalize().to_be_bytes()),
ChecksumHasher::Sha1(hasher) => {
let result = hasher.finalize();
b64_encode(&result[..])
}
#[cfg(feature = "ring")]
ChecksumHasher::Sha256(ctx) => b64_encode(ctx.finish().as_ref()),
#[cfg(not(feature = "ring"))]
ChecksumHasher::Sha256(hasher) => {
let result = hasher.finalize();
b64_encode(&result[..])
}
}
}
}
/// A stream wrapper that computes checksum incrementally while streaming data
struct ChecksumVerifyingStream<S> {
inner: S,
hasher: Option<ChecksumHasher>,
expected_checksum: String,
finished: bool,
}
impl<S> ChecksumVerifyingStream<S>
where
S: Stream<Item = Result<Bytes, reqwest::Error>> + Unpin,
{
fn new(stream: S, algorithm: ChecksumAlgorithm, expected_checksum: String) -> Self {
Self {
inner: stream,
hasher: Some(ChecksumHasher::new(algorithm)),
expected_checksum,
finished: false,
}
}
}
impl<S> Stream for ChecksumVerifyingStream<S>
where
S: Stream<Item = Result<Bytes, reqwest::Error>> + Unpin,
{
type Item = io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
if self.finished {
return Poll::Ready(None);
}
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(Ok(bytes))) => {
if let Some(hasher) = &mut self.hasher {
hasher.update(&bytes);
}
Poll::Ready(Some(Ok(bytes)))
}
Poll::Ready(Some(Err(e))) => {
self.finished = true;
Poll::Ready(Some(Err(io::Error::other(e))))
}
Poll::Ready(None) => {
self.finished = true;
if let Some(hasher) = self.hasher.take() {
let computed = hasher.finalize();
if computed != self.expected_checksum {
return Poll::Ready(Some(Err(io::Error::other(format!(
"Checksum mismatch: expected {}, computed {}",
self.expected_checksum, computed
)))));
}
}
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}
pub struct GetObjectResponse { pub struct GetObjectResponse {
request: S3Request, request: S3Request,
headers: HeaderMap, headers: HeaderMap,
body: Bytes, // Note: not used body: Bytes, // Note: not used
resp: reqwest::Response, resp: reqwest::Response,
verify_checksum: bool,
} }
impl_has_s3fields!(GetObjectResponse); impl_has_s3fields!(GetObjectResponse);
@ -45,11 +217,62 @@ impl HasRegion for GetObjectResponse {}
impl HasObject for GetObjectResponse {} impl HasObject for GetObjectResponse {}
impl HasVersion for GetObjectResponse {} impl HasVersion for GetObjectResponse {}
impl HasEtagFromHeaders for GetObjectResponse {} impl HasEtagFromHeaders for GetObjectResponse {}
impl HasChecksumHeaders for GetObjectResponse {}
impl GetObjectResponse { impl GetObjectResponse {
/// Checks if the checksum is a composite (multipart) checksum.
///
/// Composite checksums are returned for objects uploaded via multipart upload.
/// They represent a checksum-of-checksums and cannot be verified by computing
/// a checksum over the full object data.
///
/// Detection is based solely on the `x-amz-checksum-type: COMPOSITE` header.
/// We intentionally do NOT try to detect composite checksums by parsing the
/// checksum value for a `-N` suffix, as this could cause false positives if
/// the server uses base64url encoding (which includes `-` in its alphabet).
fn is_composite_checksum(&self) -> bool {
if let Some(checksum_type) = self.checksum_type()
&& checksum_type.eq_ignore_ascii_case("COMPOSITE")
{
return true;
}
false
}
/// Returns the content of the object as a (streaming) byte buffer. Note: consumes the response. /// Returns the content of the object as a (streaming) byte buffer. Note: consumes the response.
///
/// If `verify_checksum` is enabled and the server provided checksums, the stream will
/// automatically verify the checksum incrementally as data is read, maintaining streaming performance.
///
/// **Note on multipart objects**: Objects uploaded via multipart upload have COMPOSITE checksums
/// (checksum-of-checksums) which cannot be verified by computing a checksum over the downloaded
/// data. For these objects, checksum verification is automatically skipped.
pub fn content(self) -> Result<ObjectContent, Error> { pub fn content(self) -> Result<ObjectContent, Error> {
let content_length: u64 = self.object_size()?; let content_length: u64 = self.object_size()?;
// Skip verification for composite checksums (multipart uploads)
// Composite checksums are checksum-of-checksums and cannot be verified
// by computing a checksum over the full object data
if self.is_composite_checksum() {
log::debug!(
"Skipping checksum verification for composite checksum (multipart upload). \
Composite checksums cannot be verified without part boundaries."
);
let body = self.resp.bytes_stream().map_err(std::io::Error::other);
return Ok(ObjectContent::new_from_stream(body, Some(content_length)));
}
if let (true, Some(algorithm)) = (self.verify_checksum, self.detect_checksum_algorithm())
&& let Some(expected) = self.get_checksum(algorithm)
{
let stream = self.resp.bytes_stream();
let verifying_stream = ChecksumVerifyingStream::new(stream, algorithm, expected);
return Ok(ObjectContent::new_from_stream(
verifying_stream,
Some(content_length),
));
}
let body = self.resp.bytes_stream().map_err(std::io::Error::other); let body = self.resp.bytes_stream().map_err(std::io::Error::other);
Ok(ObjectContent::new_from_stream(body, Some(content_length))) Ok(ObjectContent::new_from_stream(body, Some(content_length)))
} }
@ -78,12 +301,68 @@ impl GetObjectResponse {
.map_err(|e| ValidationErr::HttpError(e).into()) .map_err(|e| ValidationErr::HttpError(e).into())
} }
/// Sets whether to automatically verify checksums when calling `content()`.
/// Default is `true`. Verification is performed incrementally during streaming with minimal overhead.
/// Set to `false` to disable checksum verification entirely.
pub fn with_verification(mut self, verify: bool) -> Self {
self.verify_checksum = verify;
self
}
/// Returns the content size (in Bytes) of the object. /// Returns the content size (in Bytes) of the object.
pub fn object_size(&self) -> Result<u64, ValidationErr> { pub fn object_size(&self) -> Result<u64, ValidationErr> {
self.resp self.resp
.content_length() .content_length()
.ok_or(ValidationErr::ContentLengthUnknown) .ok_or(ValidationErr::ContentLengthUnknown)
} }
/// Returns the content with automatic checksum verification.
///
/// Downloads the full content, computes its checksum, and verifies against server checksum.
///
/// **Note on multipart objects**: Objects uploaded via multipart upload have COMPOSITE checksums
/// (checksum-of-checksums) which cannot be verified by computing a checksum over the downloaded
/// data. For these objects, checksum verification is automatically skipped and the content is
/// returned without verification.
pub async fn content_verified(self) -> Result<Bytes, Error> {
// Skip verification for composite checksums (multipart uploads)
if self.is_composite_checksum() {
log::debug!(
"Skipping checksum verification for composite checksum (multipart upload). \
Composite checksums cannot be verified without part boundaries."
);
return self
.resp
.bytes()
.await
.map_err(|e| ValidationErr::HttpError(e).into());
}
let algorithm = self.detect_checksum_algorithm();
let expected_checksum = algorithm.and_then(|algo| self.get_checksum(algo));
let bytes = self.resp.bytes().await.map_err(ValidationErr::HttpError)?;
if let (Some(algo), Some(expected)) = (algorithm, expected_checksum) {
let computed = compute_checksum(algo, &bytes);
if computed != expected {
return Err(Error::Validation(ValidationErr::ChecksumMismatch {
expected,
computed,
}));
}
}
Ok(bytes)
}
/// Returns whether the object has a composite checksum (from multipart upload).
///
/// This can be used to check if checksum verification will be skipped.
pub fn has_composite_checksum(&self) -> bool {
self.is_composite_checksum()
}
} }
#[async_trait] #[async_trait]
@ -98,6 +377,7 @@ impl FromS3Response for GetObjectResponse {
headers: mem::take(resp.headers_mut()), headers: mem::take(resp.headers_mut()),
body: Bytes::new(), body: Bytes::new(),
resp, resp,
verify_checksum: true, // Default to auto-verify
}) })
} }
} }

View File

@ -14,7 +14,9 @@
// limitations under the License. // limitations under the License.
use crate::s3::error::ValidationErr; use crate::s3::error::ValidationErr;
use crate::s3::response_traits::{HasBucket, HasEtagFromHeaders, HasObject, HasRegion, HasVersion}; use crate::s3::response_traits::{
HasBucket, HasChecksumHeaders, HasEtagFromHeaders, HasObject, HasRegion, HasVersion,
};
use crate::s3::types::S3Request; use crate::s3::types::S3Request;
use crate::s3::utils::get_text_result; use crate::s3::utils::get_text_result;
use crate::{impl_from_s3response, impl_from_s3response_with_size, impl_has_s3fields}; use crate::{impl_from_s3response, impl_from_s3response_with_size, impl_has_s3fields};
@ -40,6 +42,7 @@ impl HasObject for S3Response1 {}
impl HasRegion for S3Response1 {} impl HasRegion for S3Response1 {}
impl HasVersion for S3Response1 {} impl HasVersion for S3Response1 {}
impl HasEtagFromHeaders for S3Response1 {} impl HasEtagFromHeaders for S3Response1 {}
impl HasChecksumHeaders for S3Response1 {}
/// Extended response struct for operations that need additional data like object size /// Extended response struct for operations that need additional data like object size
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -60,6 +63,7 @@ impl HasObject for S3Response1WithSize {}
impl HasRegion for S3Response1WithSize {} impl HasRegion for S3Response1WithSize {}
impl HasVersion for S3Response1WithSize {} impl HasVersion for S3Response1WithSize {}
impl HasEtagFromHeaders for S3Response1WithSize {} impl HasEtagFromHeaders for S3Response1WithSize {}
impl HasChecksumHeaders for S3Response1WithSize {}
impl S3Response1WithSize { impl S3Response1WithSize {
pub fn new(response: S3Response1, object_size: u64) -> Self { pub fn new(response: S3Response1, object_size: u64) -> Self {
@ -93,6 +97,7 @@ impl HasObject for S3MultipartResponse {}
impl HasRegion for S3MultipartResponse {} impl HasRegion for S3MultipartResponse {}
impl HasVersion for S3MultipartResponse {} impl HasVersion for S3MultipartResponse {}
impl HasEtagFromHeaders for S3MultipartResponse {} impl HasEtagFromHeaders for S3MultipartResponse {}
impl HasChecksumHeaders for S3MultipartResponse {}
impl S3MultipartResponse { impl S3MultipartResponse {
/// Returns the upload ID for the multipart upload, while consuming the response. /// Returns the upload ID for the multipart upload, while consuming the response.

View File

@ -16,8 +16,8 @@
use crate::s3::error::ValidationErr; use crate::s3::error::ValidationErr;
use crate::s3::header_constants::*; use crate::s3::header_constants::*;
use crate::s3::response_traits::{ use crate::s3::response_traits::{
HasBucket, HasEtagFromHeaders, HasIsDeleteMarker, HasObject, HasObjectSize, HasRegion, HasBucket, HasChecksumHeaders, HasEtagFromHeaders, HasIsDeleteMarker, HasObject, HasObjectSize,
HasS3Fields, HasVersion, HasRegion, HasS3Fields, HasVersion,
}; };
use crate::s3::types::S3Request; use crate::s3::types::S3Request;
use crate::s3::types::{RetentionMode, parse_legal_hold}; use crate::s3::types::{RetentionMode, parse_legal_hold};
@ -46,6 +46,7 @@ impl HasRegion for StatObjectResponse {}
impl HasObject for StatObjectResponse {} impl HasObject for StatObjectResponse {}
impl HasEtagFromHeaders for StatObjectResponse {} impl HasEtagFromHeaders for StatObjectResponse {}
impl HasIsDeleteMarker for StatObjectResponse {} impl HasIsDeleteMarker for StatObjectResponse {}
impl HasChecksumHeaders for StatObjectResponse {}
impl HasVersion for StatObjectResponse {} impl HasVersion for StatObjectResponse {}
impl HasObjectSize for StatObjectResponse {} impl HasObjectSize for StatObjectResponse {}

View File

@ -67,7 +67,7 @@
use crate::s3::error::ValidationErr; use crate::s3::error::ValidationErr;
use crate::s3::header_constants::*; use crate::s3::header_constants::*;
use crate::s3::types::S3Request; use crate::s3::types::S3Request;
use crate::s3::utils::{get_text_result, parse_bool, trim_quotes}; use crate::s3::utils::{ChecksumAlgorithm, get_text_result, parse_bool, trim_quotes};
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
use http::HeaderMap; use http::HeaderMap;
use std::collections::HashMap; use std::collections::HashMap;
@ -275,3 +275,90 @@ pub trait HasTagging: HasS3Fields {
Ok(tags) Ok(tags)
} }
} }
/// Provides checksum-related methods for S3 responses with headers.
///
/// This trait provides default implementations for extracting and detecting checksums
/// from S3 response headers. Implement this trait for any response type that has
/// `HeaderMap` access via `HasS3Fields`.
pub trait HasChecksumHeaders: HasS3Fields {
/// Extracts the checksum value from response headers for the specified algorithm.
///
/// Retrieves the base64-encoded checksum value from the appropriate S3 response header
/// (x-amz-checksum-crc32, x-amz-checksum-crc32c, x-amz-checksum-crc64nvme,
/// x-amz-checksum-sha1, or x-amz-checksum-sha256).
///
/// # Arguments
///
/// * `algorithm` - The checksum algorithm to retrieve
///
/// # Returns
///
/// - `Some(checksum)` if the header is present, containing the base64-encoded checksum value
/// - `None` if the header is not found
///
/// # Use Cases
///
/// - Compare with locally computed checksums for manual verification
/// - Store checksum values for audit or compliance records
/// - Verify integrity after downloading to disk
#[inline]
fn get_checksum(&self, algorithm: ChecksumAlgorithm) -> Option<String> {
let header_name = match algorithm {
ChecksumAlgorithm::CRC32 => X_AMZ_CHECKSUM_CRC32,
ChecksumAlgorithm::CRC32C => X_AMZ_CHECKSUM_CRC32C,
ChecksumAlgorithm::SHA1 => X_AMZ_CHECKSUM_SHA1,
ChecksumAlgorithm::SHA256 => X_AMZ_CHECKSUM_SHA256,
ChecksumAlgorithm::CRC64NVME => X_AMZ_CHECKSUM_CRC64NVME,
};
self.headers()
.get(header_name)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string())
}
/// Returns the checksum type from response headers.
///
/// The checksum type indicates whether the checksum represents:
/// - `FULL_OBJECT` - A checksum computed over the entire object
/// - `COMPOSITE` - A checksum-of-checksums for multipart uploads
///
/// # Returns
///
/// - `Some(type_string)` if the `x-amz-checksum-type` header is present
/// - `None` if the header is not found
#[inline]
fn checksum_type(&self) -> Option<String> {
self.headers()
.get(X_AMZ_CHECKSUM_TYPE)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string())
}
/// Detects which checksum algorithm was used by the server (if any).
///
/// Examines response headers to determine if the server computed a checksum
/// for this operation.
///
/// # Returns
///
/// - `Some(algorithm)` if a checksum header is found (CRC32, CRC32C, CRC64NVME, SHA1, or SHA256)
/// - `None` if no checksum headers are present
#[inline]
fn detect_checksum_algorithm(&self) -> Option<ChecksumAlgorithm> {
if self.headers().contains_key(X_AMZ_CHECKSUM_CRC32) {
Some(ChecksumAlgorithm::CRC32)
} else if self.headers().contains_key(X_AMZ_CHECKSUM_CRC32C) {
Some(ChecksumAlgorithm::CRC32C)
} else if self.headers().contains_key(X_AMZ_CHECKSUM_CRC64NVME) {
Some(ChecksumAlgorithm::CRC64NVME)
} else if self.headers().contains_key(X_AMZ_CHECKSUM_SHA1) {
Some(ChecksumAlgorithm::SHA1)
} else if self.headers().contains_key(X_AMZ_CHECKSUM_SHA256) {
Some(ChecksumAlgorithm::SHA256)
} else {
None
}
}
}

View File

@ -394,6 +394,155 @@ pub(crate) fn post_presign_v4(
get_signature(&signing_key, string_to_sign.as_bytes()) get_signature(&signing_key, string_to_sign.as_bytes())
} }
// ===========================
// Chunk Signing for STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER
// ===========================
/// Context required for signing streaming chunks.
///
/// This struct captures the parameters needed to sign each chunk
/// in STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER uploads.
/// The signing key can be reused for all chunks since it only depends
/// on date/region/service which don't change within a request.
#[derive(Debug, Clone)]
pub struct ChunkSigningContext {
/// The signing key (Arc for zero-copy sharing across chunks)
pub signing_key: Arc<[u8]>,
/// AMZ date format: 20241219T120000Z
pub date_time: String,
/// Credential scope: 20241219/us-east-1/s3/aws4_request
pub scope: String,
/// The seed signature from Authorization header (hex-encoded)
pub seed_signature: String,
}
/// Signs a single chunk for STREAMING-AWS4-HMAC-SHA256-PAYLOAD streaming.
///
/// The string-to-sign format is:
/// ```text
/// AWS4-HMAC-SHA256-PAYLOAD
/// <timestamp>
/// <scope>
/// <previous-signature>
/// <SHA256-of-empty-string>
/// <SHA256-of-chunk-data>
/// ```
///
/// # Arguments
/// * `signing_key` - The derived signing key
/// * `date_time` - AMZ date format (e.g., "20130524T000000Z")
/// * `scope` - Credential scope (e.g., "20130524/us-east-1/s3/aws4_request")
/// * `previous_signature` - Previous chunk's signature (or seed signature for first chunk)
/// * `chunk_hash` - Pre-computed SHA256 hash of the chunk data (hex-encoded)
///
/// # Returns
/// The hex-encoded signature for this chunk.
pub fn sign_chunk(
signing_key: &[u8],
date_time: &str,
scope: &str,
previous_signature: &str,
chunk_hash: &str,
) -> String {
// SHA256 of empty string (constant per AWS spec)
const EMPTY_SHA256: &str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
// Build string-to-sign per AWS spec
let string_to_sign = format!(
"AWS4-HMAC-SHA256-PAYLOAD\n{}\n{}\n{}\n{}\n{}",
date_time, scope, previous_signature, EMPTY_SHA256, chunk_hash
);
hmac_hash_hex(signing_key, string_to_sign.as_bytes())
}
/// Signs the trailer for STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER.
///
/// The string-to-sign format is:
/// ```text
/// AWS4-HMAC-SHA256-TRAILER
/// <timestamp>
/// <scope>
/// <last-chunk-signature>
/// <SHA256-of-canonical-trailers>
/// ```
///
/// # Arguments
/// * `signing_key` - The derived signing key
/// * `date_time` - AMZ date format (e.g., "20130524T000000Z")
/// * `scope` - Credential scope (e.g., "20130524/us-east-1/s3/aws4_request")
/// * `last_chunk_signature` - The signature of the final 0-byte chunk
/// * `canonical_trailers_hash` - Pre-computed SHA256 hash of canonical trailers (hex-encoded)
///
/// The canonical trailers format is: `<lowercase-header-name>:<trimmed-value>\n`
/// Example: `x-amz-checksum-crc32c:sOO8/Q==\n`
///
/// **Note**: The canonical form uses LF (`\n`), not CRLF (`\r\n`), even though
/// the HTTP wire format uses CRLF. This is per AWS SigV4 specification.
///
/// # Returns
/// The hex-encoded trailer signature.
pub fn sign_trailer(
signing_key: &[u8],
date_time: &str,
scope: &str,
last_chunk_signature: &str,
canonical_trailers_hash: &str,
) -> String {
// Build string-to-sign per AWS spec
let string_to_sign = format!(
"AWS4-HMAC-SHA256-TRAILER\n{}\n{}\n{}\n{}",
date_time, scope, last_chunk_signature, canonical_trailers_hash
);
hmac_hash_hex(signing_key, string_to_sign.as_bytes())
}
/// Signs request and returns context for chunk signing.
///
/// This is used for STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER uploads
/// where the encoder needs the signing key and seed signature to sign
/// each chunk during streaming.
///
/// The `cache` parameter should be the per-client `signing_key_cache` from `SharedClientItems`.
pub(crate) fn sign_v4_s3_with_context(
cache: &RwLock<SigningKeyCache>,
method: &Method,
uri: &str,
region: &str,
headers: &mut Multimap,
query_params: &Multimap,
access_key: &str,
secret_key: &str,
content_sha256: &str,
date: UtcTime,
) -> ChunkSigningContext {
let scope = get_scope(date, region, "s3");
let (signed_headers, canonical_headers) = headers.get_canonical_headers();
let canonical_query_string = query_params.get_canonical_query_string();
let canonical_request_hash = get_canonical_request_hash(
method,
uri,
&canonical_query_string,
&canonical_headers,
&signed_headers,
content_sha256,
);
let string_to_sign = get_string_to_sign(date, &scope, &canonical_request_hash);
let signing_key = get_signing_key(cache, secret_key, date, region, "s3");
let signature = get_signature(&signing_key, string_to_sign.as_bytes());
let authorization = get_authorization(access_key, &scope, &signed_headers, &signature);
headers.add(AUTHORIZATION, authorization);
ChunkSigningContext {
signing_key,
date_time: to_amz_date(date),
scope,
seed_signature: signature,
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -762,4 +911,274 @@ mod tests {
assert_eq!(sig1, sig2); assert_eq!(sig1, sig2);
} }
// ===========================
// Chunk Signing Tests
// ===========================
#[test]
fn test_sign_chunk_produces_valid_signature() {
// Use signing key derived from AWS test credentials
let cache = test_cache();
let secret_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY";
let date = get_test_date();
let region = "us-east-1";
let signing_key = get_signing_key(&cache, secret_key, date, region, "s3");
let date_time = "20130524T000000Z";
let scope = "20130524/us-east-1/s3/aws4_request";
let previous_signature = "4f232c4386841ef735655705268965c44a0e4690baa4adea153f7db9fa80a0a9";
// SHA256 of some test data
let chunk_hash = "bf718b6f653bebc184e1479f1935b8da974d701b893afcf49e701f3e2f9f9c5a";
let signature = sign_chunk(
&signing_key,
date_time,
scope,
previous_signature,
chunk_hash,
);
// Should produce 64 character hex signature
assert_eq!(signature.len(), 64);
assert!(signature.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn test_sign_chunk_deterministic() {
let cache = test_cache();
let secret_key = "test_secret";
let date = get_test_date();
let region = "us-east-1";
let signing_key = get_signing_key(&cache, secret_key, date, region, "s3");
let date_time = "20130524T000000Z";
let scope = "20130524/us-east-1/s3/aws4_request";
let previous_signature = "abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234";
let chunk_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
let sig1 = sign_chunk(
&signing_key,
date_time,
scope,
previous_signature,
chunk_hash,
);
let sig2 = sign_chunk(
&signing_key,
date_time,
scope,
previous_signature,
chunk_hash,
);
assert_eq!(sig1, sig2);
}
#[test]
fn test_sign_chunk_empty_data() {
let cache = test_cache();
let secret_key = "test_secret";
let date = get_test_date();
let region = "us-east-1";
let signing_key = get_signing_key(&cache, secret_key, date, region, "s3");
let date_time = "20130524T000000Z";
let scope = "20130524/us-east-1/s3/aws4_request";
let previous_signature = "abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234";
// SHA256 of empty data
let chunk_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
let signature = sign_chunk(
&signing_key,
date_time,
scope,
previous_signature,
chunk_hash,
);
// Should still produce valid signature for empty chunk
assert_eq!(signature.len(), 64);
assert!(signature.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn test_sign_chunk_chaining() {
let cache = test_cache();
let secret_key = "test_secret";
let date = get_test_date();
let region = "us-east-1";
let signing_key = get_signing_key(&cache, secret_key, date, region, "s3");
let date_time = "20130524T000000Z";
let scope = "20130524/us-east-1/s3/aws4_request";
let seed_signature = "seed1234seed1234seed1234seed1234seed1234seed1234seed1234seed1234";
let chunk1_hash = "aaaa1111aaaa1111aaaa1111aaaa1111aaaa1111aaaa1111aaaa1111aaaa1111";
let chunk2_hash = "bbbb2222bbbb2222bbbb2222bbbb2222bbbb2222bbbb2222bbbb2222bbbb2222";
// Sign first chunk with seed signature
let chunk1_sig = sign_chunk(&signing_key, date_time, scope, seed_signature, chunk1_hash);
// Sign second chunk with first chunk's signature
let chunk2_sig = sign_chunk(&signing_key, date_time, scope, &chunk1_sig, chunk2_hash);
// Signatures should be different
assert_ne!(chunk1_sig, chunk2_sig);
// Both should be valid hex
assert_eq!(chunk1_sig.len(), 64);
assert_eq!(chunk2_sig.len(), 64);
}
#[test]
fn test_sign_trailer_produces_valid_signature() {
let cache = test_cache();
let secret_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY";
let date = get_test_date();
let region = "us-east-1";
let signing_key = get_signing_key(&cache, secret_key, date, region, "s3");
let date_time = "20130524T000000Z";
let scope = "20130524/us-east-1/s3/aws4_request";
let last_chunk_signature =
"b6c6ea8a5354eaf15b3cb7646744f4275b71ea724fed81ceb9323e279d449df9";
// SHA256 of "x-amz-checksum-crc32c:sOO8/Q==\n"
let canonical_trailers_hash =
"1e376db7e1a34a8ef1c4bcee131a2d60a1cb62503747488624e10995f448d774";
let signature = sign_trailer(
&signing_key,
date_time,
scope,
last_chunk_signature,
canonical_trailers_hash,
);
// Should produce 64 character hex signature
assert_eq!(signature.len(), 64);
assert!(signature.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn test_sign_trailer_deterministic() {
let cache = test_cache();
let secret_key = "test_secret";
let date = get_test_date();
let region = "us-east-1";
let signing_key = get_signing_key(&cache, secret_key, date, region, "s3");
let date_time = "20130524T000000Z";
let scope = "20130524/us-east-1/s3/aws4_request";
let last_chunk_signature =
"abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234";
let canonical_trailers_hash =
"1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd";
let sig1 = sign_trailer(
&signing_key,
date_time,
scope,
last_chunk_signature,
canonical_trailers_hash,
);
let sig2 = sign_trailer(
&signing_key,
date_time,
scope,
last_chunk_signature,
canonical_trailers_hash,
);
assert_eq!(sig1, sig2);
}
#[test]
fn test_sign_v4_s3_with_context_returns_valid_context() {
let cache = test_cache();
let method = Method::PUT;
let uri = "/examplebucket/chunkObject.txt";
let region = "us-east-1";
let mut headers = Multimap::new();
let date = get_test_date();
let content_sha256 = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER";
let access_key = "AKIAIOSFODNN7EXAMPLE";
let secret_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY";
headers.add(HOST, "s3.amazonaws.com");
headers.add(X_AMZ_CONTENT_SHA256, content_sha256);
headers.add(X_AMZ_DATE, "20130524T000000Z");
headers.add("Content-Encoding", "aws-chunked");
headers.add("x-amz-decoded-content-length", "66560");
headers.add("x-amz-trailer", "x-amz-checksum-crc32c");
let query_params = Multimap::new();
let context = sign_v4_s3_with_context(
&cache,
&method,
uri,
region,
&mut headers,
&query_params,
access_key,
secret_key,
content_sha256,
date,
);
// Authorization header should be added
assert!(headers.contains_key("Authorization"));
let auth_header = headers.get("Authorization").unwrap();
assert!(auth_header.starts_with("AWS4-HMAC-SHA256"));
// Context should have valid values
assert!(!context.signing_key.is_empty());
assert_eq!(context.date_time, "20130524T000000Z");
assert_eq!(context.scope, "20130524/us-east-1/s3/aws4_request");
assert_eq!(context.seed_signature.len(), 64);
assert!(
context
.seed_signature
.chars()
.all(|c| c.is_ascii_hexdigit())
);
}
#[test]
fn test_chunk_signing_context_can_be_cloned() {
let cache = test_cache();
let method = Method::PUT;
let uri = "/test";
let region = "us-east-1";
let mut headers = Multimap::new();
let date = get_test_date();
let content_sha256 = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER";
let access_key = "test";
let secret_key = "secret";
headers.add(HOST, "s3.amazonaws.com");
headers.add(X_AMZ_CONTENT_SHA256, content_sha256);
headers.add(X_AMZ_DATE, "20130524T000000Z");
let query_params = Multimap::new();
let context = sign_v4_s3_with_context(
&cache,
&method,
uri,
region,
&mut headers,
&query_params,
access_key,
secret_key,
content_sha256,
date,
);
// Should be cloneable (required for async streams)
let cloned = context.clone();
assert_eq!(context.date_time, cloned.date_time);
assert_eq!(context.scope, cloned.scope);
assert_eq!(context.seed_signature, cloned.seed_signature);
}
} }

View File

@ -16,7 +16,7 @@
//! Basic S3 data types: ListEntry, Bucket, Part, Retention, etc. //! Basic S3 data types: ListEntry, Bucket, Part, Retention, etc.
use crate::s3::error::ValidationErr; use crate::s3::error::ValidationErr;
use crate::s3::utils::UtcTime; use crate::s3::utils::{ChecksumAlgorithm, UtcTime};
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
@ -53,11 +53,34 @@ pub struct Part {
pub etag: String, pub etag: String,
} }
/// Contains part information for multipart uploads including optional checksum.
///
/// Only one checksum algorithm is active per upload, so the checksum is stored
/// as an optional tuple of (algorithm, base64-encoded value).
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct PartInfo { pub struct PartInfo {
pub number: u16, pub number: u16,
pub etag: String, pub etag: String,
pub size: u64, pub size: u64,
/// Optional checksum for this part: (algorithm, base64-encoded value)
pub checksum: Option<(ChecksumAlgorithm, String)>,
}
impl PartInfo {
/// Creates a new PartInfo.
pub fn new(
number: u16,
etag: String,
size: u64,
checksum: Option<(ChecksumAlgorithm, String)>,
) -> Self {
Self {
number,
etag,
size,
checksum,
}
}
} }
#[derive(PartialEq, Clone, Debug)] #[derive(PartialEq, Clone, Debug)]

View File

@ -105,3 +105,31 @@ pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: &str =
pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str = pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str =
"X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key-MD5"; "X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key-MD5";
pub const X_AMZ_CHECKSUM_ALGORITHM: &str = "X-Amz-Checksum-Algorithm";
pub const X_AMZ_CHECKSUM_CRC32: &str = "X-Amz-Checksum-CRC32";
pub const X_AMZ_CHECKSUM_CRC32C: &str = "X-Amz-Checksum-CRC32C";
pub const X_AMZ_CHECKSUM_SHA1: &str = "X-Amz-Checksum-SHA1";
pub const X_AMZ_CHECKSUM_SHA256: &str = "X-Amz-Checksum-SHA256";
pub const X_AMZ_CHECKSUM_CRC64NVME: &str = "X-Amz-Checksum-CRC64NVME";
pub const X_AMZ_CHECKSUM_TYPE: &str = "X-Amz-Checksum-Type";
pub const X_AMZ_TRAILER: &str = "X-Amz-Trailer";
pub const X_AMZ_DECODED_CONTENT_LENGTH: &str = "X-Amz-Decoded-Content-Length";
pub const CONTENT_ENCODING: &str = "Content-Encoding";
/// Content-SHA256 value for streaming uploads with unsigned payload and trailing checksum
pub const STREAMING_UNSIGNED_PAYLOAD_TRAILER: &str = "STREAMING-UNSIGNED-PAYLOAD-TRAILER";
/// Content-SHA256 value for streaming uploads with signed payload and trailing checksum.
/// Each chunk is signed with AWS Signature V4, and the trailer includes a trailer signature.
pub const STREAMING_AWS4_HMAC_SHA256_PAYLOAD_TRAILER: &str =
"STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER";

View File

@ -19,6 +19,7 @@ use super::super::client::{DEFAULT_REGION, MinioClient};
use crate::s3::error::Error; use crate::s3::error::Error;
use crate::s3::multimap_ext::Multimap; use crate::s3::multimap_ext::Multimap;
use crate::s3::segmented_bytes::SegmentedBytes; use crate::s3::segmented_bytes::SegmentedBytes;
use crate::s3::utils::ChecksumAlgorithm;
use http::Method; use http::Method;
use std::sync::Arc; use std::sync::Arc;
use typed_builder::TypedBuilder; use typed_builder::TypedBuilder;
@ -50,6 +51,20 @@ pub struct S3Request {
#[builder(default, setter(into))] #[builder(default, setter(into))]
body: Option<Arc<SegmentedBytes>>, body: Option<Arc<SegmentedBytes>>,
/// Optional trailing checksum algorithm for streaming uploads.
///
/// When set, the request body will be sent using aws-chunked encoding
/// with the checksum computed incrementally and appended as a trailer.
#[builder(default)]
pub(crate) trailing_checksum: Option<ChecksumAlgorithm>,
/// When true and trailing checksums are enabled, signs each chunk with AWS Signature V4.
///
/// Uses STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER where each chunk is signed
/// and the trailer includes a trailer signature.
#[builder(default = false)]
pub(crate) use_signed_streaming: bool,
/// region computed by [`S3Request::execute`] /// region computed by [`S3Request::execute`]
#[builder(default, setter(skip))] #[builder(default, setter(skip))]
pub(crate) inner_region: String, pub(crate) inner_region: String,
@ -76,6 +91,8 @@ impl S3Request {
&self.bucket.as_deref(), &self.bucket.as_deref(),
&self.object.as_deref(), &self.object.as_deref(),
self.body.as_ref().map(Arc::clone), self.body.as_ref().map(Arc::clone),
self.trailing_checksum,
self.use_signed_streaming,
) )
.await .await
} }

View File

@ -20,19 +20,21 @@ use crate::s3::segmented_bytes::SegmentedBytes;
use crate::s3::sse::{Sse, SseCustomerKey}; use crate::s3::sse::{Sse, SseCustomerKey};
use base64::engine::Engine as _; use base64::engine::Engine as _;
use chrono::{DateTime, Datelike, NaiveDateTime, Utc}; use chrono::{DateTime, Datelike, NaiveDateTime, Utc};
use crc::{CRC_32_ISO_HDLC, Crc}; use crc_fast::{CrcAlgorithm, Digest as CrcFastDigest, checksum as crc_fast_checksum};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use percent_encoding::{AsciiSet, NON_ALPHANUMERIC, percent_decode_str, utf8_percent_encode}; use percent_encoding::{AsciiSet, NON_ALPHANUMERIC, percent_decode_str, utf8_percent_encode};
use regex::Regex; use regex::Regex;
#[cfg(feature = "ring")] #[cfg(feature = "ring")]
use ring::digest::{Context, SHA256}; use ring::digest::{Context, SHA256};
use sha1::{Digest as Sha1Digest, Sha1};
#[cfg(not(feature = "ring"))] #[cfg(not(feature = "ring"))]
use sha2::{Digest, Sha256}; use sha2::Sha256;
use std::collections::HashMap; use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use xmltree::Element; use xmltree::Element;
/// Date and time with UTC timezone /// Date and time with UTC timezone.
pub type UtcTime = DateTime<Utc>; pub type UtcTime = DateTime<Utc>;
// Great stuff to get confused about. // Great stuff to get confused about.
@ -60,10 +62,20 @@ pub fn b64_encode(input: impl AsRef<[u8]>) -> String {
base64::engine::general_purpose::STANDARD.encode(input) base64::engine::general_purpose::STANDARD.encode(input)
} }
/// Computes CRC32 of given data. /// Computes CRC32 of given data using hardware-accelerated SIMD implementation.
///
/// Uses crc-fast which provides hardware acceleration via PCLMULQDQ/CLMUL instructions
/// on modern CPUs, achieving >50 GiB/s throughput (vs ~0.5 GiB/s for software).
pub fn crc32(data: &[u8]) -> u32 { pub fn crc32(data: &[u8]) -> u32 {
//TODO creating a new Crc object is expensive, we should cache it crc_fast_checksum(CrcAlgorithm::Crc32IsoHdlc, data) as u32
Crc::<u32>::new(&CRC_32_ISO_HDLC).checksum(data) }
/// Computes CRC64-NVME of given data using hardware-accelerated SIMD implementation.
///
/// Uses crc-fast which provides hardware acceleration via PCLMULQDQ/CLMUL instructions
/// on modern CPUs, achieving >50 GiB/s throughput (vs ~0.5 GiB/s for software).
pub fn crc64nvme(data: &[u8]) -> u64 {
crc_fast_checksum(CrcAlgorithm::Crc64Nvme, data)
} }
/// Converts data array into 32-bit BigEndian unsigned int. /// Converts data array into 32-bit BigEndian unsigned int.
@ -168,9 +180,252 @@ pub fn sha256_hash_sb(sb: Arc<SegmentedBytes>) -> String {
} }
} }
/// S3 checksum algorithms supported by the API
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChecksumAlgorithm {
CRC32,
CRC32C,
SHA1,
SHA256,
CRC64NVME,
}
impl ChecksumAlgorithm {
/// Returns the AWS header value for this checksum algorithm.
pub fn as_str(&self) -> &'static str {
match self {
ChecksumAlgorithm::CRC32 => "CRC32",
ChecksumAlgorithm::CRC32C => "CRC32C",
ChecksumAlgorithm::SHA1 => "SHA1",
ChecksumAlgorithm::SHA256 => "SHA256",
ChecksumAlgorithm::CRC64NVME => "CRC64NVME",
}
}
/// Returns the HTTP header name for this checksum algorithm (e.g., "X-Amz-Checksum-CRC32").
pub fn header_name(&self) -> &'static str {
use crate::s3::types::header_constants::*;
match self {
ChecksumAlgorithm::CRC32 => X_AMZ_CHECKSUM_CRC32,
ChecksumAlgorithm::CRC32C => X_AMZ_CHECKSUM_CRC32C,
ChecksumAlgorithm::SHA1 => X_AMZ_CHECKSUM_SHA1,
ChecksumAlgorithm::SHA256 => X_AMZ_CHECKSUM_SHA256,
ChecksumAlgorithm::CRC64NVME => X_AMZ_CHECKSUM_CRC64NVME,
}
}
}
/// Parses a checksum algorithm name from a string.
///
/// Case-insensitive parsing of S3 checksum algorithm names. Useful for parsing
/// header values or configuration strings.
///
/// # Supported Values
///
/// - `"CRC32"` / `"crc32"` - Standard CRC32 checksum
/// - `"CRC32C"` / `"crc32c"` - CRC32C (Castagnoli) checksum
/// - `"SHA1"` / `"sha1"` - SHA-1 hash
/// - `"SHA256"` / `"sha256"` - SHA-256 hash
/// - `"CRC64NVME"` / `"crc64nvme"` - CRC-64/NVME checksum
///
/// # Errors
///
/// Returns an error string if the algorithm name is not recognized.
impl FromStr for ChecksumAlgorithm {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_uppercase().as_str() {
"CRC32" => Ok(ChecksumAlgorithm::CRC32),
"CRC32C" => Ok(ChecksumAlgorithm::CRC32C),
"SHA1" => Ok(ChecksumAlgorithm::SHA1),
"SHA256" => Ok(ChecksumAlgorithm::SHA256),
"CRC64NVME" => Ok(ChecksumAlgorithm::CRC64NVME),
_ => Err(format!("Unknown checksum algorithm: {}", s)),
}
}
}
/// Computes CRC32C checksum (Castagnoli polynomial) and returns base64-encoded value.
///
/// Uses crc-fast which provides hardware acceleration via PCLMULQDQ/CLMUL instructions
/// on modern CPUs, achieving >50 GiB/s throughput.
pub fn crc32c(data: &[u8]) -> String {
let checksum = crc_fast_checksum(CrcAlgorithm::Crc32Iscsi, data) as u32;
b64_encode(checksum.to_be_bytes())
}
/// Computes SHA1 hash and returns base64-encoded value
pub fn sha1_hash(data: &[u8]) -> String {
let mut hasher = Sha1::new();
hasher.update(data);
let result = hasher.finalize();
b64_encode(&result[..])
}
/// Computes SHA256 hash and returns base64-encoded value (for checksums, not authentication)
pub fn sha256_checksum(data: &[u8]) -> String {
#[cfg(feature = "ring")]
{
b64_encode(ring::digest::digest(&SHA256, data).as_ref())
}
#[cfg(not(feature = "ring"))]
{
let result = Sha256::new_with_prefix(data).finalize();
b64_encode(&result[..])
}
}
/// Computes CRC32 checksum and returns base64-encoded value
pub fn crc32_checksum(data: &[u8]) -> String {
b64_encode(crc32(data).to_be_bytes())
}
/// Computes CRC64-NVME checksum and returns base64-encoded value
pub fn crc64nvme_checksum(data: &[u8]) -> String {
b64_encode(crc64nvme(data).to_be_bytes())
}
/// Computes checksum based on the specified algorithm for contiguous byte slices.
///
/// This function computes checksums on already-materialized `&[u8]` data. Use this when:
/// - Data is already in a contiguous buffer (e.g., from `reqwest::Response::bytes()`)
/// - Working with small byte arrays in tests
/// - Data comes from sources other than `SegmentedBytes`
///
/// **Performance Note**: If you have data in `SegmentedBytes`, use [`compute_checksum_sb`]
/// instead to avoid copying. Calling `.to_bytes()` on `SegmentedBytes` creates a full copy
/// of all segments, which is expensive for large objects (up to 5GB per part).
///
/// # Arguments
///
/// * `algorithm` - The checksum algorithm to use (CRC32, CRC32C, CRC64NVME, SHA1, SHA256)
/// * `data` - The contiguous byte slice to compute checksum over
///
/// # Returns
///
/// Base64-encoded checksum string suitable for S3 headers
///
/// # Example
///
/// ```
/// use minio::s3::utils::{compute_checksum, ChecksumAlgorithm};
///
/// let data = b"hello world";
/// let checksum = compute_checksum(ChecksumAlgorithm::CRC32C, data);
/// println!("CRC32C: {}", checksum);
/// ```
pub fn compute_checksum(algorithm: ChecksumAlgorithm, data: &[u8]) -> String {
match algorithm {
ChecksumAlgorithm::CRC32 => crc32_checksum(data),
ChecksumAlgorithm::CRC32C => crc32c(data),
ChecksumAlgorithm::SHA1 => sha1_hash(data),
ChecksumAlgorithm::SHA256 => sha256_checksum(data),
ChecksumAlgorithm::CRC64NVME => crc64nvme_checksum(data),
}
}
/// Computes checksum for `SegmentedBytes` without copying data (zero-copy streaming).
///
/// This function computes checksums by iterating over segments incrementally, avoiding
/// the need to materialize the entire buffer in contiguous memory. This is critical for
/// performance when working with large objects (up to 5GB per part in multipart uploads).
///
/// **Always use this function for `SegmentedBytes` data** instead of calling `.to_bytes()`
/// followed by `compute_checksum()`, which would create an expensive full copy.
///
/// # Performance Characteristics
///
/// - **Memory**: Only allocates hasher state (~64 bytes for SHA256, ~4-8 bytes for CRC)
/// - **CPU**: Hardware-accelerated where available (CRC32C with SSE 4.2)
/// - **Streaming**: Processes data incrementally without buffering
///
/// # Arguments
///
/// * `algorithm` - The checksum algorithm to use (CRC32, CRC32C, CRC64NVME, SHA1, SHA256)
/// * `sb` - The segmented bytes to compute checksum over (passed by reference, not consumed)
///
/// # Returns
///
/// Base64-encoded checksum string suitable for S3 headers
///
/// # Example
///
/// ```
/// use minio::s3::utils::{compute_checksum_sb, ChecksumAlgorithm};
/// use minio::s3::segmented_bytes::SegmentedBytes;
/// use std::sync::Arc;
/// use bytes::Bytes;
///
/// let mut sb = SegmentedBytes::new();
/// sb.append(Bytes::from("hello "));
/// sb.append(Bytes::from("world"));
/// let sb = Arc::new(sb);
///
/// let checksum = compute_checksum_sb(ChecksumAlgorithm::CRC32C, &sb);
/// println!("CRC32C: {}", checksum);
/// ```
///
/// # See Also
///
/// - [`compute_checksum`] - For already-contiguous `&[u8]` data
pub fn compute_checksum_sb(algorithm: ChecksumAlgorithm, sb: &Arc<SegmentedBytes>) -> String {
match algorithm {
ChecksumAlgorithm::CRC32 => {
let mut digest = CrcFastDigest::new(CrcAlgorithm::Crc32IsoHdlc);
for data in sb.iter() {
digest.update(data.as_ref());
}
b64_encode((digest.finalize() as u32).to_be_bytes())
}
ChecksumAlgorithm::CRC32C => {
let mut digest = CrcFastDigest::new(CrcAlgorithm::Crc32Iscsi);
for data in sb.iter() {
digest.update(data.as_ref());
}
b64_encode((digest.finalize() as u32).to_be_bytes())
}
ChecksumAlgorithm::SHA1 => {
let mut hasher = Sha1::new();
for data in sb.iter() {
hasher.update(data.as_ref());
}
let result = hasher.finalize();
b64_encode(&result[..])
}
ChecksumAlgorithm::SHA256 => {
#[cfg(feature = "ring")]
{
let mut context = Context::new(&SHA256);
for data in sb.iter() {
context.update(data.as_ref());
}
b64_encode(context.finish().as_ref())
}
#[cfg(not(feature = "ring"))]
{
let mut hasher = Sha256::new();
for data in sb.iter() {
hasher.update(data.as_ref());
}
let result = hasher.finalize();
b64_encode(&result[..])
}
}
ChecksumAlgorithm::CRC64NVME => {
let mut digest = CrcFastDigest::new(CrcAlgorithm::Crc64Nvme);
for data in sb.iter() {
digest.update(data.as_ref());
}
b64_encode(digest.finalize().to_be_bytes())
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use bytes::Bytes;
use std::collections::HashMap; use std::collections::HashMap;
#[test] #[test]
@ -290,6 +545,93 @@ mod tests {
assert_eq!(empty_hash, "1B2M2Y8AsgTpgAmY7PhCfg=="); assert_eq!(empty_hash, "1B2M2Y8AsgTpgAmY7PhCfg==");
} }
#[test]
fn test_crc32c() {
let checksum = crc32c(b"hello");
assert!(!checksum.is_empty());
let checksum_empty = crc32c(b"");
assert!(!checksum_empty.is_empty());
let checksum_standard = crc32c(b"123456789");
assert!(!checksum_standard.is_empty());
}
#[test]
fn test_sha1_hash() {
let hash = sha1_hash(b"hello");
assert!(!hash.is_empty());
let hash_empty = sha1_hash(b"");
assert!(!hash_empty.is_empty());
let hash_fox = sha1_hash(b"The quick brown fox jumps over the lazy dog");
assert!(!hash_fox.is_empty());
}
#[test]
fn test_sha256_checksum() {
let checksum = sha256_checksum(b"hello");
assert!(!checksum.is_empty());
let checksum_empty = sha256_checksum(b"");
assert!(!checksum_empty.is_empty());
}
#[test]
fn test_crc32_checksum() {
let checksum = crc32_checksum(b"hello");
assert!(!checksum.is_empty());
let checksum_empty = crc32_checksum(b"");
assert_eq!(checksum_empty, "AAAAAA==");
let checksum_standard = crc32_checksum(b"123456789");
assert!(!checksum_standard.is_empty());
}
#[test]
fn test_checksum_algorithm_as_str() {
assert_eq!(ChecksumAlgorithm::CRC32.as_str(), "CRC32");
assert_eq!(ChecksumAlgorithm::CRC32C.as_str(), "CRC32C");
assert_eq!(ChecksumAlgorithm::SHA1.as_str(), "SHA1");
assert_eq!(ChecksumAlgorithm::SHA256.as_str(), "SHA256");
}
#[test]
fn test_checksum_algorithm_from_str() {
assert_eq!(
"CRC32".parse::<ChecksumAlgorithm>().unwrap(),
ChecksumAlgorithm::CRC32
);
assert_eq!(
"crc32c".parse::<ChecksumAlgorithm>().unwrap(),
ChecksumAlgorithm::CRC32C
);
assert_eq!(
"SHA1".parse::<ChecksumAlgorithm>().unwrap(),
ChecksumAlgorithm::SHA1
);
assert_eq!(
"sha256".parse::<ChecksumAlgorithm>().unwrap(),
ChecksumAlgorithm::SHA256
);
assert!("invalid".parse::<ChecksumAlgorithm>().is_err());
}
#[test]
fn test_compute_checksum() {
let data = b"hello world";
let crc32_result = compute_checksum(ChecksumAlgorithm::CRC32, data);
assert!(!crc32_result.is_empty());
let crc32c_result = compute_checksum(ChecksumAlgorithm::CRC32C, data);
assert!(!crc32c_result.is_empty());
let sha1_result = compute_checksum(ChecksumAlgorithm::SHA1, data);
assert!(!sha1_result.is_empty());
let sha256_result = compute_checksum(ChecksumAlgorithm::SHA256, data);
assert!(!sha256_result.is_empty());
assert_ne!(crc32_result, crc32c_result);
assert_ne!(sha1_result, sha256_result);
}
#[test] #[test]
fn test_parse_bool_true() { fn test_parse_bool_true() {
assert!(parse_bool("true").unwrap()); assert!(parse_bool("true").unwrap());
@ -564,6 +906,7 @@ mod tests {
#[test] #[test]
fn test_match_region_basic() { fn test_match_region_basic() {
let _result = match_region("us-east-1"); let _result = match_region("us-east-1");
// TODO consider fixing or removing this test
// Test that match_region returns a boolean (always true) // Test that match_region returns a boolean (always true)
} }
@ -647,6 +990,36 @@ mod tests {
let tags = parse_tags("Environment=Production").unwrap(); let tags = parse_tags("Environment=Production").unwrap();
assert!(!tags.is_empty()); assert!(!tags.is_empty());
} }
#[test]
fn test_compute_checksum_sb_matches_compute_checksum() {
// Test data
let test_data = b"The quick brown fox jumps over the lazy dog";
// Create SegmentedBytes with multiple segments to test incremental computation
let mut sb = SegmentedBytes::new();
sb.append(Bytes::from(&test_data[0..10]));
sb.append(Bytes::from(&test_data[10..25]));
sb.append(Bytes::from(&test_data[25..]));
let sb = Arc::new(sb);
// Test all algorithms
for algo in [
ChecksumAlgorithm::CRC32,
ChecksumAlgorithm::CRC32C,
ChecksumAlgorithm::CRC64NVME,
ChecksumAlgorithm::SHA1,
ChecksumAlgorithm::SHA256,
] {
let from_bytes = compute_checksum(algo, test_data);
let from_sb = compute_checksum_sb(algo, &sb);
assert_eq!(
from_bytes, from_sb,
"Mismatch for {:?}: bytes='{}' vs sb='{}'",
algo, from_bytes, from_sb
);
}
}
} }
/// Gets base64-encoded MD5 hash of given data. /// Gets base64-encoded MD5 hash of given data.

View File

@ -21,10 +21,12 @@ mod client_config;
// Object operations // Object operations
mod append_object; mod append_object;
mod get_object; mod get_object;
mod object_checksums;
mod object_compose; mod object_compose;
mod object_copy; mod object_copy;
mod object_delete; mod object_delete;
mod object_put; mod object_put;
mod test_checksums;
mod upload_download_object; mod upload_download_object;
// Bucket operations // Bucket operations

2031
tests/s3/object_checksums.rs Normal file

File diff suppressed because it is too large Load Diff

267
tests/s3/test_checksums.rs Normal file
View File

@ -0,0 +1,267 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2025 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use minio::s3::utils::{
ChecksumAlgorithm, compute_checksum, crc32_checksum, crc32c, crc64nvme_checksum, sha1_hash,
sha256_checksum,
};
/// Test CRC32 checksum computation
#[test]
fn test_crc32_checksum() {
let data = b"Hello, World!";
let checksum = crc32_checksum(data);
// Verify it's base64 encoded
assert!(!checksum.is_empty());
assert!(base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &checksum).is_ok());
}
/// Test CRC32C checksum computation
#[test]
fn test_crc32c_checksum() {
let data = b"Hello, World!";
let checksum = crc32c(data);
// Verify it's base64 encoded
assert!(!checksum.is_empty());
assert!(base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &checksum).is_ok());
}
/// Test CRC64-NVME checksum computation
#[test]
fn test_crc64nvme_checksum() {
let data = b"Hello, World!";
let checksum = crc64nvme_checksum(data);
// Verify it's base64 encoded
assert!(!checksum.is_empty());
assert!(base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &checksum).is_ok());
// Verify it's different from CRC32/CRC32C (different algorithms produce different results)
let crc32_result = crc32_checksum(data);
assert_ne!(checksum, crc32_result);
}
/// Test SHA1 hash computation
#[test]
fn test_sha1_hash() {
let data = b"Hello, World!";
let hash = sha1_hash(data);
// Verify it's base64 encoded
assert!(!hash.is_empty());
assert!(base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &hash).is_ok());
}
/// Test SHA256 checksum computation
#[test]
fn test_sha256_checksum() {
let data = b"Hello, World!";
let checksum = sha256_checksum(data);
// Verify it's base64 encoded
assert!(!checksum.is_empty());
assert!(base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &checksum).is_ok());
}
/// Test compute_checksum with all algorithms
#[test]
fn test_compute_checksum_all_algorithms() {
let data = b"Test data for checksums";
let crc32 = compute_checksum(ChecksumAlgorithm::CRC32, data);
let crc32c = compute_checksum(ChecksumAlgorithm::CRC32C, data);
let crc64nvme = compute_checksum(ChecksumAlgorithm::CRC64NVME, data);
let sha1 = compute_checksum(ChecksumAlgorithm::SHA1, data);
let sha256 = compute_checksum(ChecksumAlgorithm::SHA256, data);
// All should be non-empty and valid base64
for checksum in [&crc32, &crc32c, &crc64nvme, &sha1, &sha256] {
assert!(!checksum.is_empty());
assert!(
base64::Engine::decode(&base64::engine::general_purpose::STANDARD, checksum).is_ok()
);
}
// All should be different (different algorithms)
assert_ne!(crc32, crc32c);
assert_ne!(crc32, crc64nvme);
assert_ne!(crc32, sha1);
assert_ne!(crc32, sha256);
assert_ne!(crc32c, crc64nvme);
}
/// Test that different data produces different checksums
#[test]
fn test_different_data_different_checksums() {
let data1 = b"First test data";
let data2 = b"Second test data";
// Test with each algorithm
for algorithm in [
ChecksumAlgorithm::CRC32,
ChecksumAlgorithm::CRC32C,
ChecksumAlgorithm::CRC64NVME,
ChecksumAlgorithm::SHA1,
ChecksumAlgorithm::SHA256,
] {
let checksum1 = compute_checksum(algorithm, data1);
let checksum2 = compute_checksum(algorithm, data2);
assert_ne!(
checksum1, checksum2,
"Algorithm {:?} produced same checksum for different data",
algorithm
);
}
}
/// Test that same data produces same checksums (deterministic)
#[test]
fn test_deterministic_checksums() {
let data = b"Deterministic test data";
for algorithm in [
ChecksumAlgorithm::CRC32,
ChecksumAlgorithm::CRC32C,
ChecksumAlgorithm::CRC64NVME,
ChecksumAlgorithm::SHA1,
ChecksumAlgorithm::SHA256,
] {
let checksum1 = compute_checksum(algorithm, data);
let checksum2 = compute_checksum(algorithm, data);
assert_eq!(
checksum1, checksum2,
"Algorithm {:?} is not deterministic",
algorithm
);
}
}
/// Test empty data checksums
#[test]
fn test_empty_data_checksums() {
let data = b"";
for algorithm in [
ChecksumAlgorithm::CRC32,
ChecksumAlgorithm::CRC32C,
ChecksumAlgorithm::CRC64NVME,
ChecksumAlgorithm::SHA1,
ChecksumAlgorithm::SHA256,
] {
let checksum = compute_checksum(algorithm, data);
// Empty data should still produce a valid checksum
assert!(!checksum.is_empty());
assert!(
base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &checksum).is_ok()
);
}
}
/// Test large data checksums
#[test]
fn test_large_data_checksums() {
// Test with 1MB of data
let data = vec![0x42u8; 1024 * 1024];
for algorithm in [
ChecksumAlgorithm::CRC32,
ChecksumAlgorithm::CRC32C,
ChecksumAlgorithm::CRC64NVME,
ChecksumAlgorithm::SHA1,
ChecksumAlgorithm::SHA256,
] {
let checksum = compute_checksum(algorithm, &data);
assert!(!checksum.is_empty());
assert!(
base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &checksum).is_ok()
);
}
}
/// Test ChecksumAlgorithm::as_str()
#[test]
fn test_checksum_algorithm_as_str() {
assert_eq!(ChecksumAlgorithm::CRC32.as_str(), "CRC32");
assert_eq!(ChecksumAlgorithm::CRC32C.as_str(), "CRC32C");
assert_eq!(ChecksumAlgorithm::CRC64NVME.as_str(), "CRC64NVME");
assert_eq!(ChecksumAlgorithm::SHA1.as_str(), "SHA1");
assert_eq!(ChecksumAlgorithm::SHA256.as_str(), "SHA256");
}
/// Test ChecksumAlgorithm::from_str()
#[test]
fn test_checksum_algorithm_from_str() {
use std::str::FromStr;
// Test uppercase
assert_eq!(
ChecksumAlgorithm::from_str("CRC32").unwrap(),
ChecksumAlgorithm::CRC32
);
assert_eq!(
ChecksumAlgorithm::from_str("CRC32C").unwrap(),
ChecksumAlgorithm::CRC32C
);
assert_eq!(
ChecksumAlgorithm::from_str("CRC64NVME").unwrap(),
ChecksumAlgorithm::CRC64NVME
);
assert_eq!(
ChecksumAlgorithm::from_str("SHA1").unwrap(),
ChecksumAlgorithm::SHA1
);
assert_eq!(
ChecksumAlgorithm::from_str("SHA256").unwrap(),
ChecksumAlgorithm::SHA256
);
// Test lowercase
assert_eq!(
ChecksumAlgorithm::from_str("crc32").unwrap(),
ChecksumAlgorithm::CRC32
);
assert_eq!(
ChecksumAlgorithm::from_str("crc32c").unwrap(),
ChecksumAlgorithm::CRC32C
);
assert_eq!(
ChecksumAlgorithm::from_str("crc64nvme").unwrap(),
ChecksumAlgorithm::CRC64NVME
);
assert_eq!(
ChecksumAlgorithm::from_str("sha1").unwrap(),
ChecksumAlgorithm::SHA1
);
assert_eq!(
ChecksumAlgorithm::from_str("sha256").unwrap(),
ChecksumAlgorithm::SHA256
);
// Test mixed case
assert_eq!(
ChecksumAlgorithm::from_str("Crc32").unwrap(),
ChecksumAlgorithm::CRC32
);
assert_eq!(
ChecksumAlgorithm::from_str("Sha256").unwrap(),
ChecksumAlgorithm::SHA256
);
// Test invalid
assert!(ChecksumAlgorithm::from_str("INVALID").is_err());
assert!(ChecksumAlgorithm::from_str("MD5").is_err());
}