diff --git a/.gitignore b/.gitignore index 83dac5d..ef18800 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ Cargo.lock .idea *.env +.cargo diff --git a/Cargo.toml b/Cargo.toml index 697fa90..30cd744 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ features = ["native-tls", "blocking", "rustls-tls", "stream"] [dev-dependencies] async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } +quickcheck = "1.0.3" [[example]] name = "file-uploader" diff --git a/src/lib.rs b/src/lib.rs index bafe378..41ca111 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,3 +17,7 @@ #![allow(clippy::result_large_err)] #![allow(clippy::too_many_arguments)] pub mod s3; + +#[cfg(test)] +#[macro_use] +extern crate quickcheck; diff --git a/src/s3/builders/object_content.rs b/src/s3/builders/object_content.rs index 6a566d3..90fb386 100644 --- a/src/s3/builders/object_content.rs +++ b/src/s3/builders/object_content.rs @@ -23,14 +23,60 @@ use tokio::fs; use tokio::io::AsyncWriteExt; use tokio_stream::StreamExt; +#[cfg(test)] +use quickcheck::Arbitrary; + type IoResult = Result; +#[derive(Debug, Clone, PartialEq, Eq, Copy)] +pub enum Size { + Known(u64), + Unknown, +} + +impl Size { + pub fn is_known(&self) -> bool { + matches!(self, Size::Known(_)) + } + + pub fn is_unknown(&self) -> bool { + matches!(self, Size::Unknown) + } + + pub fn as_u64(&self) -> Option { + match self { + Size::Known(v) => Some(*v), + Size::Unknown => None, + } + } +} + +impl From> for Size { + fn from(value: Option) -> Self { + match value { + Some(v) => Size::Known(v), + None => Size::Unknown, + } + } +} + +#[cfg(test)] +impl Arbitrary for Size { + fn arbitrary(g: &mut quickcheck::Gen) -> Self { + if bool::arbitrary(g) { + Size::Known(u64::arbitrary(g)) + } else { + Size::Unknown + } + } +} + /// Object content that can be uploaded or downloaded. Can be constructed from a stream of `Bytes`, /// a file path, or a `Bytes` object. pub struct ObjectContent(ObjectContentInner); enum ObjectContentInner { - Stream(Pin>>>, Option), + Stream(Pin> + Send>>, Size), FilePath(PathBuf), Bytes(SegmentedBytes), } @@ -72,28 +118,28 @@ impl Default for ObjectContent { impl ObjectContent { /// Create a new `ObjectContent` from a stream of `Bytes`. pub fn new_from_stream( - r: impl Stream> + 'static, - size: Option, + r: impl Stream> + Send + 'static, + size: impl Into, ) -> Self { let r = Box::pin(r); - ObjectContent(ObjectContentInner::Stream(r, size)) + ObjectContent(ObjectContentInner::Stream(r, size.into())) } pub async fn to_stream( self, - ) -> IoResult<(Pin>>>, Option)> { + ) -> IoResult<(Pin> + Send>>, Size)> { match self.0 { ObjectContentInner::Stream(r, size) => Ok((r, size)), ObjectContentInner::FilePath(path) => { let file = fs::File::open(&path).await?; let size = file.metadata().await?.len(); let r = tokio_util::io::ReaderStream::new(file); - Ok((Box::pin(r), Some(size))) + Ok((Box::pin(r), Some(size).into())) } ObjectContentInner::Bytes(sb) => { let k = sb.len(); let r = Box::pin(tokio_stream::iter(sb.into_iter().map(Ok))); - Ok((r, Some(k as u64))) + Ok((r, Some(k as u64).into())) } } } @@ -165,18 +211,21 @@ impl ObjectContent { } pub(crate) struct ContentStream { - r: Pin>>>, + r: Pin> + Send>>, extra: Option, - size: Option, + size: Size, } impl ContentStream { - pub fn new(r: impl Stream> + 'static, size: Option) -> Self { + pub fn new( + r: impl Stream> + Send + 'static, + size: impl Into, + ) -> Self { let r = Box::pin(r); Self { r, extra: None, - size, + size: size.into(), } } @@ -184,11 +233,11 @@ impl ContentStream { Self { r: Box::pin(tokio_stream::iter(vec![])), extra: None, - size: Some(0), + size: Some(0).into(), } } - pub fn get_size(&self) -> Option { + pub fn get_size(&self) -> Size { self.size } diff --git a/src/s3/builders/put_object.rs b/src/s3/builders/put_object.rs index 1e09a70..d4dfee4 100644 --- a/src/s3/builders/put_object.rs +++ b/src/s3/builders/put_object.rs @@ -19,15 +19,16 @@ use bytes::BytesMut; use http::Method; use crate::s3::{ - builders::ContentStream, + builders::{ContentStream, Size}, client::Client, error::Error, response::{ AbortMultipartUploadResponse2, CompleteMultipartUploadResponse2, - CreateMultipartUploadResponse2, PutObjectResponse2, UploadPartResponse2, + CreateMultipartUploadResponse2, PutObjectContentResponse, PutObjectResponse, + UploadPartResponse2, }, sse::Sse, - types::{Part, Retention, S3Api, S3Request, ToS3Request}, + types::{PartInfo, Retention, S3Api, S3Request, ToS3Request}, utils::{check_bucket_name, md5sum_hash, merge, to_iso8601utc, urlencode, Multimap}, }; @@ -211,11 +212,11 @@ pub struct CompleteMultipartUpload { bucket: String, object: String, upload_id: String, - parts: Vec, + parts: Vec, } impl CompleteMultipartUpload { - pub fn new(bucket: &str, object: &str, upload_id: &str, parts: Vec) -> Self { + pub fn new(bucket: &str, object: &str, upload_id: &str, parts: Vec) -> Self { CompleteMultipartUpload { bucket: bucket.to_string(), object: object.to_string(), @@ -544,7 +545,7 @@ impl ToS3Request for PutObject { } impl S3Api for PutObject { - type S3Response = PutObjectResponse2; + type S3Response = PutObjectResponse; } fn object_write_args_headers( @@ -624,7 +625,7 @@ pub struct PutObjectContent { tags: Option>, retention: Option, legal_hold: bool, - part_size: Option, + part_size: Size, content_type: String, // source data @@ -633,7 +634,7 @@ pub struct PutObjectContent { // Computed. // expected_parts: Option, content_stream: ContentStream, - part_count: u16, + part_count: Option, } impl PutObjectContent { @@ -651,10 +652,10 @@ impl PutObjectContent { tags: None, retention: None, legal_hold: false, - part_size: None, + part_size: Size::Unknown, content_type: String::from("application/octet-stream"), content_stream: ContentStream::empty(), - part_count: 0, + part_count: None, } } @@ -703,8 +704,8 @@ impl PutObjectContent { self } - pub fn part_size(mut self, part_size: Option) -> Self { - self.part_size = part_size; + pub fn part_size(mut self, part_size: impl Into) -> Self { + self.part_size = part_size.into(); self } @@ -713,7 +714,7 @@ impl PutObjectContent { self } - pub async fn send(mut self) -> Result { + pub async fn send(mut self) -> Result { check_bucket_name(&self.bucket, true)?; if self.object.is_empty() { @@ -728,10 +729,13 @@ impl PutObjectContent { .await .map_err(|e| Error::IOError(e))?; + // object_size may be Size::Unknown. let object_size = self.content_stream.get_size(); + let (psize, expected_parts) = calc_part_info(object_size, self.part_size)?; - assert_ne!(expected_parts, Some(0)); - self.part_size = Some(psize); + // Set the chosen part size and part count. + self.part_size = Size::Known(psize); + self.part_count = expected_parts; let client = self.client.clone().ok_or(Error::NoClientProvided)?; @@ -746,13 +750,30 @@ impl PutObjectContent { // In the first part read, if: // - // - we got less than the expected part size, OR + // - object_size is unknown AND we got less than the part size, OR // - we are expecting only one part to be uploaded, // // we upload it as a simple put object. - if (seg_bytes.len() as u64) < psize || expected_parts == Some(1) { + if (object_size.is_unknown() && (seg_bytes.len() as u64) < psize) + || expected_parts == Some(1) + { + let size = seg_bytes.len() as u64; let po = self.to_put_object(seg_bytes); - return po.send().await; + let res = po.send().await?; + return Ok(PutObjectContentResponse { + headers: res.headers, + bucket_name: self.bucket, + object_name: self.object, + location: res.location, + object_size: size, + etag: res.etag, + version_id: res.version_id, + }); + } else if object_size.is_known() && (seg_bytes.len() as u64) < psize { + // Not enough data! + let expected = object_size.as_u64().unwrap(); + let got = seg_bytes.len() as u64; + return Err(Error::InsufficientData(expected, got)); } // Otherwise, we start a multipart upload. @@ -764,16 +785,15 @@ impl PutObjectContent { let create_mpu_resp = create_mpu.send().await?; - let res = self + let mpu_res = self .send_mpu( psize, - expected_parts, create_mpu_resp.upload_id.clone(), object_size, seg_bytes, ) .await; - if res.is_err() { + if mpu_res.is_err() { // If we failed to complete the multipart upload, we should abort it. let _ = AbortMultipartUpload::new(&self.bucket, &self.object, &create_mpu_resp.upload_id) @@ -781,26 +801,26 @@ impl PutObjectContent { .send() .await; } - res + mpu_res } async fn send_mpu( &mut self, psize: u64, - expected_parts: Option, upload_id: String, - object_size: Option, - seg_bytes: SegmentedBytes, - ) -> Result { + object_size: Size, + first_part: SegmentedBytes, + ) -> Result { let mut done = false; let mut part_number = 0; - let mut parts: Vec = if let Some(pc) = expected_parts { + let mut parts: Vec = if let Some(pc) = self.part_count { Vec::with_capacity(pc as usize) } else { Vec::new() }; - let mut first_part = Some(seg_bytes); + let mut first_part = Some(first_part); + let mut total_read = 0; while !done { let part_content = { if let Some(v) = first_part.take() { @@ -811,6 +831,7 @@ impl PutObjectContent { }; part_number += 1; let buffer_size = part_content.len() as u64; + total_read += buffer_size; assert!(buffer_size <= psize, "{:?} <= {:?}", buffer_size, psize); @@ -821,20 +842,24 @@ impl PutObjectContent { } // Check if we have too many parts to upload. - if expected_parts.is_none() && part_number > MAX_MULTIPART_COUNT { - return Err(Error::InvalidPartCount( - object_size.unwrap_or(0), - self.part_size.unwrap(), - self.part_count, - )); + if self.part_count.is_none() && part_number > MAX_MULTIPART_COUNT { + return Err(Error::TooManyParts); + } + + if object_size.is_known() { + let exp = object_size.as_u64().unwrap(); + if exp < total_read { + return Err(Error::TooMuchData(exp)); + } } // Upload the part now. let upload_part = self.to_upload_part(part_content, &upload_id, part_number); let upload_part_resp = upload_part.send().await?; - parts.push(Part { + parts.push(PartInfo { number: part_number, etag: upload_part_resp.etag, + size: buffer_size, }); // Finally check if we are done. @@ -844,8 +869,26 @@ impl PutObjectContent { } // Complete the multipart upload. + let size = parts.iter().map(|p| p.size).sum(); + + if object_size.is_known() { + let expected = object_size.as_u64().unwrap(); + if expected != size { + return Err(Error::InsufficientData(expected, size)); + } + } + let complete_mpu = self.to_complete_multipart_upload(&upload_id, parts); - complete_mpu.send().await + let res = complete_mpu.send().await?; + Ok(PutObjectContentResponse { + headers: res.headers, + bucket_name: self.bucket.clone(), + object_name: self.object.clone(), + location: res.location, + object_size: size, + etag: res.etag, + version_id: res.version_id, + }) } } @@ -896,7 +939,7 @@ impl PutObjectContent { fn to_complete_multipart_upload( &self, upload_id: &str, - parts: Vec, + parts: Vec, ) -> CompleteMultipartUpload { CompleteMultipartUpload { client: self.client.clone(), @@ -918,12 +961,9 @@ pub const MAX_MULTIPART_COUNT: u16 = 10_000; // Returns the size of each part to upload and the total number of parts. The // number of parts is `None` when the object size is unknown. -fn calc_part_info( - object_size: Option, - part_size: Option, -) -> Result<(u64, Option), Error> { +fn calc_part_info(object_size: Size, part_size: Size) -> Result<(u64, Option), Error> { // Validate arguments against limits. - if let Some(v) = part_size { + if let Size::Known(v) = part_size { if v < MIN_PART_SIZE { return Err(Error::InvalidMinPartSize(v)); } @@ -933,18 +973,28 @@ fn calc_part_info( } } - if let Some(v) = object_size { + if let Size::Known(v) = object_size { if v > MAX_OBJECT_SIZE { return Err(Error::InvalidObjectSize(v)); } } match (object_size, part_size) { - (None, None) => Err(Error::MissingPartSize), - (None, Some(part_size)) => Ok((part_size, None)), - (Some(object_size), None) => { + // If object size is unknown, part size must be provided. + (Size::Unknown, Size::Unknown) => Err(Error::MissingPartSize), + + // If object size is unknown, and part size is known, the number of + // parts will be unknown, so return None for that. + (Size::Unknown, Size::Known(part_size)) => Ok((part_size, None)), + + // If object size is known, and part size is unknown, calculate part + // size. + (Size::Known(object_size), Size::Unknown) => { + // 1. Calculate the minimum part size (i.e. assuming part count is + // maximum). let mut psize: u64 = (object_size as f64 / MAX_MULTIPART_COUNT as f64).ceil() as u64; + // 2. Round up to the nearest multiple of MIN_PART_SIZE. psize = MIN_PART_SIZE * (psize as f64 / MIN_PART_SIZE as f64).ceil() as u64; if psize > object_size { @@ -959,7 +1009,10 @@ fn calc_part_info( Ok((psize, Some(part_count))) } - (Some(object_size), Some(part_size)) => { + + // If both object size and part size are known, validate the resulting + // part count and return. + (Size::Known(object_size), Size::Known(part_size)) => { let part_count = (object_size as f64 / part_size as f64).ceil() as u16; if part_count == 0 || part_count > MAX_MULTIPART_COUNT { return Err(Error::InvalidPartCount( @@ -973,3 +1026,80 @@ fn calc_part_info( } } } + +#[cfg(test)] +mod tests { + use super::*; + quickcheck! { + fn test_calc_part_info(object_size: Size, part_size: Size) -> bool { + let res = calc_part_info(object_size, part_size); + + // Validate that basic invalid sizes return the expected error. + if let Size::Known(v) = part_size { + if v < MIN_PART_SIZE { + match res { + Err(Error::InvalidMinPartSize(v_err)) => return v == v_err, + _ => return false, + } + } + if v > MAX_PART_SIZE { + match res { + Err(Error::InvalidMaxPartSize(v_err)) => return v == v_err, + _ => return false, + } + } + } + if let Size::Known(v) = object_size { + if v > MAX_OBJECT_SIZE { + match res { + Err(Error::InvalidObjectSize(v_err)) => return v == v_err, + _ => return false, + } + } + } + + // Validate the calculation of part size and part count. + match (object_size, part_size, res) { + (Size::Unknown, Size::Unknown, Err(Error::MissingPartSize)) => true, + (Size::Unknown, Size::Unknown, _) => false, + + (Size::Unknown, Size::Known(part_size), Ok((psize, None))) => { + psize == part_size + } + (Size::Unknown, Size::Known(_), _) => false, + + (Size::Known(object_size), Size::Unknown, Ok((psize, Some(part_count)))) => { + if object_size < MIN_PART_SIZE { + return psize == object_size && part_count == 1; + } + if psize < MIN_PART_SIZE || psize > MAX_PART_SIZE{ + return false; + } + if psize > object_size { + return false; + } + part_count > 0 && part_count <= MAX_MULTIPART_COUNT + } + (Size::Known(_), Size::Unknown, _) => false, + + (Size::Known(object_size), Size::Known(part_size), res) => { + if part_size > object_size || (part_size * (MAX_MULTIPART_COUNT as u64)) < object_size { + match res { + Err(Error::InvalidPartCount(v1, v2, v3)) => { + return v1 == object_size && v2 == part_size && v3 == MAX_MULTIPART_COUNT; + } + _ => return false, + } + } + match res { + Ok((psize, part_count)) => { + let expected_part_count = (object_size as f64 / part_size as f64).ceil() as u16; + return psize == part_size && part_count == Some(expected_part_count); + } + _ => false, + } + } + } + } + } +} diff --git a/src/s3/client.rs b/src/s3/client.rs index eb220e0..97902c6 100644 --- a/src/s3/client.rs +++ b/src/s3/client.rs @@ -2342,7 +2342,7 @@ impl Client { args: &mut PutObjectArgs<'_>, buf: &mut [u8], upload_id: &mut String, - ) -> Result { + ) -> Result { let mut headers = args.get_headers(); if !headers.contains_key("Content-Type") { if args.content_type.is_empty() { @@ -2375,7 +2375,7 @@ impl Client { bytes_read = Client::read_part(&mut args.stream, buf, part_size)?; if bytes_read != part_size { - return Err(Error::InsufficientData(part_size, bytes_read)); + return Err(Error::InsufficientData(part_size as u64, bytes_read as u64)); } } else { let mut size = part_size + 1; @@ -2459,7 +2459,7 @@ impl Client { pub async fn put_object_old( &self, args: &mut PutObjectArgs<'_>, - ) -> Result { + ) -> Result { if let Some(v) = &args.sse { if v.tls_required() && !self.base_url.https { return Err(Error::SseTlsRequired(None)); diff --git a/src/s3/client/put_object.rs b/src/s3/client/put_object.rs index 283e293..5cf78c4 100644 --- a/src/s3/client/put_object.rs +++ b/src/s3/client/put_object.rs @@ -21,7 +21,7 @@ use crate::s3::{ AbortMultipartUpload, CompleteMultipartUpload, CreateMultipartUpload, ObjectContent, PutObject, PutObjectContent, SegmentedBytes, UploadPart, }, - types::Part, + types::PartInfo, }; impl Client { @@ -50,7 +50,7 @@ impl Client { bucket: &str, object: &str, upload_id: &str, - parts: Vec, + parts: Vec, ) -> CompleteMultipartUpload { CompleteMultipartUpload::new(bucket, object, upload_id, parts).client(self) } diff --git a/src/s3/error.rs b/src/s3/error.rs index fdc55be..01612dd 100644 --- a/src/s3/error.rs +++ b/src/s3/error.rs @@ -84,8 +84,10 @@ pub enum Error { InvalidObjectSize(u64), MissingPartSize, InvalidPartCount(u64, u64, u16), + TooManyParts, SseTlsRequired(Option), - InsufficientData(usize, usize), + TooMuchData(u64), + InsufficientData(u64, u64), InvalidLegalHold(String), InvalidSelectExpression(String), InvalidHeaderValueType(u8), @@ -157,15 +159,17 @@ impl fmt::Display for Error { "object size {} and part size {} make more than {} parts for upload", os, ps, pc ), + Error::TooManyParts => write!(f, "too many parts for upload"), Error::SseTlsRequired(m) => write!( f, "{}SSE operation must be performed over a secure connection", m.as_ref().map_or(String::new(), |v| v.clone()) ), - Error::InsufficientData(ps, br) => write!( + Error::TooMuchData(s) => write!(f, "too much data in the stream - exceeds {} bytes", s), + Error::InsufficientData(expected, got) => write!( f, "not enough data in the stream; expected: {}, got: {} bytes", - ps, br + expected, got ), Error::InvalidBaseUrl(m) => write!(f, "{}", m), Error::UrlBuildError(m) => write!(f, "{}", m), diff --git a/src/s3/response.rs b/src/s3/response.rs index bdac643..28f6f1d 100644 --- a/src/s3/response.rs +++ b/src/s3/response.rs @@ -45,7 +45,8 @@ pub use list_objects::{ pub use listen_bucket_notification::ListenBucketNotificationResponse; pub use put_object::{ AbortMultipartUploadResponse2, CompleteMultipartUploadResponse2, - CreateMultipartUploadResponse2, PutObjectResponse as PutObjectResponse2, UploadPartResponse2, + CreateMultipartUploadResponse2, PutObjectContentResponse, PutObjectResponse, + UploadPartResponse2, }; #[derive(Debug)] @@ -112,7 +113,7 @@ pub type PutObjectApiResponse = PutObjectBaseResponse; pub type UploadPartResponse = PutObjectApiResponse; /// Response of [put_object()](crate::s3::client::Client::put_object) API -pub type PutObjectResponse = PutObjectApiResponse; +pub type PutObjectResponseOld = PutObjectApiResponse; /// Response of [upload_part_copy()](crate::s3::client::Client::upload_part_copy) S3 API pub type UploadPartCopyResponse = PutObjectApiResponse; @@ -332,7 +333,10 @@ impl SelectObjectContentResponse { self.prelude_read = true; for i in 0..8 { - self.prelude[i] = self.buf.pop_front().ok_or(Error::InsufficientData(8, i))?; + self.prelude[i] = self + .buf + .pop_front() + .ok_or(Error::InsufficientData(8 as u64, i as u64))?; } Ok(true) @@ -345,7 +349,10 @@ impl SelectObjectContentResponse { self.prelude_crc_read = true; for i in 0..4 { - self.prelude_crc[i] = self.buf.pop_front().ok_or(Error::InsufficientData(4, i))?; + self.prelude_crc[i] = self + .buf + .pop_front() + .ok_or(Error::InsufficientData(4 as u64, i as u64))?; } Ok(true) @@ -364,7 +371,7 @@ impl SelectObjectContentResponse { self.data.push( self.buf .pop_front() - .ok_or(Error::InsufficientData(data_length, i))?, + .ok_or(Error::InsufficientData(data_length as u64, i as u64))?, ); } @@ -378,7 +385,10 @@ impl SelectObjectContentResponse { self.message_crc_read = true; for i in 0..4 { - self.message_crc[i] = self.buf.pop_front().ok_or(Error::InsufficientData(4, i))?; + self.message_crc[i] = self + .buf + .pop_front() + .ok_or(Error::InsufficientData(4 as u64, i as u64))?; } Ok(true) diff --git a/src/s3/response/get_object.rs b/src/s3/response/get_object.rs index 8752df2..202236e 100644 --- a/src/s3/response/get_object.rs +++ b/src/s3/response/get_object.rs @@ -29,6 +29,8 @@ pub struct GetObjectResponse2 { pub object_name: String, pub version_id: Option, pub content: ObjectContent, + pub object_size: u64, + pub etag: Option, } #[async_trait] @@ -38,10 +40,13 @@ impl FromS3Response for GetObjectResponse2 { response: reqwest::Response, ) -> Result { let header_map = response.headers().clone(); - let version_id = match header_map.get("x-amz-version-id") { - Some(v) => Some(v.to_str()?.to_string()), - None => None, - }; + let version_id = header_map + .get("x-amz-version-id") + .map(|v| v.to_str().unwrap().to_string()); + + let etag = header_map + .get("etag") + .map(|v| v.to_str().unwrap().trim_matches('"').to_string()); let content_length = response .content_length() @@ -59,6 +64,8 @@ impl FromS3Response for GetObjectResponse2 { object_name: req.object.unwrap().to_string(), version_id, content, + object_size: content_length, + etag, }) } } diff --git a/src/s3/response/put_object.rs b/src/s3/response/put_object.rs index ee6b0ec..e5cbae9 100644 --- a/src/s3/response/put_object.rs +++ b/src/s3/response/put_object.rs @@ -59,10 +59,8 @@ impl FromS3Response for PutObjectResponse { } } -pub type CreateMultipartUploadResponse2 = UploadIdResponse2; - #[derive(Debug, Clone)] -pub struct UploadIdResponse2 { +pub struct CreateMultipartUploadResponse2 { pub headers: HeaderMap, pub region: String, pub bucket_name: String, @@ -71,7 +69,7 @@ pub struct UploadIdResponse2 { } #[async_trait] -impl FromS3Response for UploadIdResponse2 { +impl FromS3Response for CreateMultipartUploadResponse2 { async fn from_s3response<'a>( req: S3Request<'a>, response: reqwest::Response, @@ -90,8 +88,19 @@ impl FromS3Response for UploadIdResponse2 { } } -pub type AbortMultipartUploadResponse2 = UploadIdResponse2; +pub type AbortMultipartUploadResponse2 = CreateMultipartUploadResponse2; pub type CompleteMultipartUploadResponse2 = PutObjectResponse; pub type UploadPartResponse2 = PutObjectResponse; + +#[derive(Debug, Clone)] +pub struct PutObjectContentResponse { + pub headers: HeaderMap, + pub bucket_name: String, + pub object_name: String, + pub location: String, + pub object_size: u64, + pub etag: String, + pub version_id: Option, +} diff --git a/src/s3/types.rs b/src/s3/types.rs index 1576e5a..c455830 100644 --- a/src/s3/types.rs +++ b/src/s3/types.rs @@ -180,6 +180,14 @@ pub struct Part { pub etag: String, } +#[derive(Clone, Debug)] +pub struct PartInfo { + pub number: u16, + pub etag: String, + + pub size: u64, +} + #[derive(Clone, Debug)] /// Contains retention mode information pub enum RetentionMode { diff --git a/tests/tests.rs b/tests/tests.rs index a470a28..44bd62e 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -315,6 +315,7 @@ impl ClientTest { async fn put_object_content(&self) { let object_name = rand_object_name(); let sizes = vec![16_u64, 5 * 1024 * 1024, 16 + 5 * 1024 * 1024]; + for size in sizes.iter() { let data_src = RandSrc::new(*size); let rsp = self @@ -327,6 +328,7 @@ impl ClientTest { .send() .await .unwrap(); + assert_eq!(rsp.object_size, *size); let etag = rsp.etag; let resp = self .client @@ -340,6 +342,91 @@ impl ClientTest { .await .unwrap(); } + + // Repeat test with no size specified in ObjectContent + for size in sizes.iter() { + let data_src = RandSrc::new(*size); + let rsp = self + .client + .put_object_content( + &self.test_bucket, + &object_name, + ObjectContent::new_from_stream(data_src, None), + ) + .part_size(Some(5 * 1024 * 1024)) // Set part size to 5MB + .send() + .await + .unwrap(); + assert_eq!(rsp.object_size, *size); + let etag = rsp.etag; + let resp = self + .client + .stat_object(&StatObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.size, *size as usize); + assert_eq!(resp.etag, etag); + self.client + .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .await + .unwrap(); + } + } + + // Test sending ObjectContent across async tasks. + async fn put_object_content_2(&self) { + let object_name = rand_object_name(); + let sizes = vec![16_u64, 5 * 1024 * 1024, 16 + 5 * 1024 * 1024]; + + let (sender, mut receiver): (mpsc::Sender, mpsc::Receiver) = + mpsc::channel(2); + + let sender_handle = { + let sizes = sizes.clone(); + tokio::spawn(async move { + for size in sizes.iter() { + let data_src = RandSrc::new(*size); + sender + .send(ObjectContent::new_from_stream(data_src, Some(*size))) + .await + .unwrap(); + } + }) + }; + + let uploader_handler = { + let sizes = sizes.clone(); + let object_name = object_name.clone(); + let client = self.client.clone(); + let test_bucket = self.test_bucket.clone(); + tokio::spawn(async move { + let mut idx = 0; + while let Some(item) = receiver.recv().await { + let rsp = client + .put_object_content(&test_bucket, &object_name, item) + .send() + .await + .unwrap(); + assert_eq!(rsp.object_size, sizes[idx]); + let etag = rsp.etag; + let resp = client + .stat_object(&StatObjectArgs::new(&test_bucket, &object_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.size, sizes[idx] as usize); + assert_eq!(resp.etag, etag); + client + .remove_object(&RemoveObjectArgs::new(&test_bucket, &object_name).unwrap()) + .await + .unwrap(); + + idx += 1; + } + }) + }; + + sender_handle.await.unwrap(); + uploader_handler.await.unwrap(); } async fn get_object_old(&self) { @@ -1323,6 +1410,9 @@ async fn s3_tests() -> Result<(), Box> { println!("put_object_stream()"); ctest.put_object_content().await; + println!("put_object_stream_2()"); + ctest.put_object_content_2().await; + println!("get_object_old()"); ctest.get_object_old().await;