// MinIO Rust Library for Amazon S3 Compatible Cloud Storage // Copyright 2025 MinIO, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. use crate::s3::builders::{ ContentStream, MAX_MULTIPART_COUNT, ObjectContent, Size, calc_part_info, }; use crate::s3::client::MinioClient; use crate::s3::error::ValidationErr; use crate::s3::error::{Error, IoError}; use crate::s3::header_constants::*; use crate::s3::multimap_ext::{Multimap, MultimapExt}; use crate::s3::response::a_response_traits::HasObjectSize; use crate::s3::response::{AppendObjectResponse, StatObjectResponse}; use crate::s3::segmented_bytes::SegmentedBytes; use crate::s3::sse::Sse; use crate::s3::types::{S3Api, S3Request, ToS3Request}; use crate::s3::utils::{check_bucket_name, check_object_name, check_sse}; use http::Method; use std::sync::Arc; use typed_builder::TypedBuilder; // region: append-object /// Argument builder for the [`AppendObject`](https://docs.aws.amazon.com/AmazonS3/latest/userguide/directory-buckets-objects-append.html) S3 API operation. /// /// This struct constructs the parameters required for the [`Client::append_object`](crate::s3::client::MinioClient::append_object) method. #[derive(Clone, Debug, TypedBuilder)] pub struct AppendObject { #[builder(!default)] // force required client: MinioClient, #[builder(default, setter(into))] extra_headers: Option, #[builder(default, setter(into))] extra_query_params: Option, #[builder(setter(into))] // force required + accept Into bucket: String, #[builder(setter(into))] // force required + accept Into object: String, #[builder(default, setter(into))] region: Option, #[builder(default, setter(into))] sse: Option>, #[builder(!default)] // force required data: Arc, #[builder(!default)] // force required /// value of x-amz-write-offset-bytes offset_bytes: u64, } impl S3Api for AppendObject { type S3Response = AppendObjectResponse; } /// Builder type for [`AppendObject`] that is returned by [`MinioClient::append_object`](crate::s3::client::MinioClient::append_object). /// /// This type alias simplifies the complex generic signature generated by the `typed_builder` crate. pub type AppendObjectBldr = AppendObjectBuilder<( (MinioClient,), (), (), (String,), (String,), (), (), (Arc,), (u64,), )>; impl ToS3Request for AppendObject { fn to_s3request(self) -> Result { check_bucket_name(&self.bucket, true)?; check_object_name(&self.object)?; check_sse(&self.sse, &self.client)?; let mut headers: Multimap = self.extra_headers.unwrap_or_default(); headers.add(X_AMZ_WRITE_OFFSET_BYTES, self.offset_bytes.to_string()); Ok(S3Request::builder() .client(self.client) .method(Method::PUT) .region(self.region) .bucket(self.bucket) .query_params(self.extra_query_params.unwrap_or_default()) .object(self.object) .headers(headers) .body(self.data) .build()) } } // endregion: append-object // region: append-object-content /// Argument builder for the [`AppendObject`](https://docs.aws.amazon.com/AmazonS3/latest/userguide/directory-buckets-objects-append.html) S3 API operation. /// /// This struct constructs the parameters required for the [`Client::append_object_content`](crate::s3::client::MinioClient::append_object_content) method. /// It is High-level API for appending content to an object using multipart uploads. /// /// `AppendObjectContent` consumes an [`ObjectContent`] stream and transparently appends it to an existing object in MinIO or S3, /// managing multipart upload details internally. #[derive(TypedBuilder)] pub struct AppendObjectContent { #[builder(!default)] // force required client: MinioClient, #[builder(default, setter(into))] extra_headers: Option, #[builder(default, setter(into))] extra_query_params: Option, #[builder(default, setter(into))] region: Option, #[builder(setter(into))] // force required + accept Into bucket: String, #[builder(setter(into))] // force required + accept Into object: String, #[builder(default)] sse: Option>, #[builder(default = Size::Unknown)] part_size: Size, #[builder(!default, setter(into))] input_content: ObjectContent, #[builder(default = ContentStream::empty())] content_stream: ContentStream, #[builder(default)] part_count: Option, /// Value of x-amz-write-offset-bytes #[builder(default)] offset_bytes: u64, } /// Builder type for [`AppendObjectContent`] that is returned by [`MinioClient::append_object_content`](crate::s3::client::MinioClient::append_object_content). /// /// This type alias simplifies the complex generic signature generated by the `typed_builder` crate. pub type AppendObjectContentBldr = AppendObjectContentBuilder<( (MinioClient,), (), (), (), (String,), (String,), (), (), (ObjectContent,), (), (), (), )>; impl AppendObjectContent { pub async fn send(mut self) -> Result { check_bucket_name(&self.bucket, true)?; check_object_name(&self.object)?; check_sse(&self.sse, &self.client)?; { let mut headers: Multimap = match self.extra_headers { Some(ref headers) => headers.clone(), None => Multimap::new(), }; headers.add(X_AMZ_WRITE_OFFSET_BYTES, self.offset_bytes.to_string()); self.extra_query_params = Some(headers); } self.content_stream = std::mem::take(&mut self.input_content) .to_content_stream() .await .map_err(IoError::from)?; // object_size may be Size::Unknown. let object_size = self.content_stream.get_size(); let (part_size, n_expected_parts) = calc_part_info(object_size, self.part_size)?; // Set the chosen part size and part count. self.part_size = Size::Known(part_size); self.part_count = n_expected_parts; // Read the first part. let seg_bytes = self .content_stream .read_upto(part_size as usize) .await .map_err(IoError::from)?; // get the length (if any) of the current file let resp: StatObjectResponse = self .client .stat_object(&self.bucket, &self.object) .build() .send() .await?; //println!("statObjectResponse={:#?}", resp); let current_file_size = resp.size()?; // In the first part read, if: // // - object_size is unknown AND we got less than the part size, OR // - we are expecting only one part to be uploaded, // // we upload it as a simple put object. if (object_size.is_unknown() && (seg_bytes.len() as u64) < part_size) || n_expected_parts == Some(1) { let ao = AppendObject { client: self.client, extra_headers: self.extra_headers, extra_query_params: self.extra_query_params, bucket: self.bucket, object: self.object, region: self.region, offset_bytes: current_file_size, sse: self.sse, data: Arc::new(seg_bytes), }; ao.send().await } else if let Some(expected) = object_size.value() && (seg_bytes.len() as u64) < part_size { // Not enough data! let got = seg_bytes.len() as u64; Err(ValidationErr::InsufficientData { expected, got })? } else { // Otherwise, we start a multipart append. self.send_mpa(part_size, current_file_size, seg_bytes).await } } /// multipart append async fn send_mpa( &mut self, part_size: u64, object_size: u64, first_part: SegmentedBytes, ) -> Result { let mut done = false; let mut part_number = 0; let mut last_resp: Option = None; let mut next_offset_bytes: u64 = object_size; //println!("initial offset_bytes: {}", next_offset_bytes); let mut first_part = Some(first_part); while !done { let part_content: SegmentedBytes = { if let Some(v) = first_part.take() { v } else { self.content_stream .read_upto(part_size as usize) .await .map_err(IoError::from)? } }; part_number += 1; let buffer_size = part_content.len() as u64; assert!(buffer_size <= part_size, "{buffer_size} <= {part_size}",); if buffer_size == 0 && part_number > 1 { // We are done as we appended at least 1 part and we have // reached the end of the stream. break; } // Check if we have too many parts to upload. if self.part_count.is_none() && part_number > MAX_MULTIPART_COUNT { return Err(ValidationErr::TooManyParts(part_number as u64).into()); } // Append the part now. let append_object = AppendObject { client: self.client.clone(), extra_headers: self.extra_headers.clone(), extra_query_params: self.extra_query_params.clone(), bucket: self.bucket.clone(), object: self.object.clone(), region: self.region.clone(), sse: self.sse.clone(), data: Arc::new(part_content), offset_bytes: next_offset_bytes, }; let resp: AppendObjectResponse = append_object.send().await?; //println!("AppendObjectResponse: object_size={:?}", resp.object_size); next_offset_bytes = resp.object_size(); // Finally check if we are done. if buffer_size < part_size { done = true; last_resp = Some(resp); } } Ok(last_resp.unwrap()) } } // endregion: append-object-content