Return size info in put/get APIs (#82)

Also:

- fix some constraints to allow ObjectContent to be sent across async
tasks.

- use new Size enum instead of Option for specifying object content
sizes

- add tests for part size calculation and validations
This commit is contained in:
Aditya Manthramurthy 2024-04-10 07:51:42 -07:00 committed by GitHub
parent 1a36097bb8
commit 773ad9133f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 396 additions and 83 deletions

1
.gitignore vendored
View File

@ -3,3 +3,4 @@
Cargo.lock Cargo.lock
.idea .idea
*.env *.env
.cargo

View File

@ -50,6 +50,7 @@ features = ["native-tls", "blocking", "rustls-tls", "stream"]
[dev-dependencies] [dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
quickcheck = "1.0.3"
[[example]] [[example]]
name = "file-uploader" name = "file-uploader"

View File

@ -17,3 +17,7 @@
#![allow(clippy::result_large_err)] #![allow(clippy::result_large_err)]
#![allow(clippy::too_many_arguments)] #![allow(clippy::too_many_arguments)]
pub mod s3; pub mod s3;
#[cfg(test)]
#[macro_use]
extern crate quickcheck;

View File

@ -23,14 +23,60 @@ use tokio::fs;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
#[cfg(test)]
use quickcheck::Arbitrary;
type IoResult<T> = Result<T, std::io::Error>; type IoResult<T> = Result<T, std::io::Error>;
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum Size {
Known(u64),
Unknown,
}
impl Size {
pub fn is_known(&self) -> bool {
matches!(self, Size::Known(_))
}
pub fn is_unknown(&self) -> bool {
matches!(self, Size::Unknown)
}
pub fn as_u64(&self) -> Option<u64> {
match self {
Size::Known(v) => Some(*v),
Size::Unknown => None,
}
}
}
impl From<Option<u64>> for Size {
fn from(value: Option<u64>) -> Self {
match value {
Some(v) => Size::Known(v),
None => Size::Unknown,
}
}
}
#[cfg(test)]
impl Arbitrary for Size {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {
if bool::arbitrary(g) {
Size::Known(u64::arbitrary(g))
} else {
Size::Unknown
}
}
}
/// Object content that can be uploaded or downloaded. Can be constructed from a stream of `Bytes`, /// Object content that can be uploaded or downloaded. Can be constructed from a stream of `Bytes`,
/// a file path, or a `Bytes` object. /// a file path, or a `Bytes` object.
pub struct ObjectContent(ObjectContentInner); pub struct ObjectContent(ObjectContentInner);
enum ObjectContentInner { enum ObjectContentInner {
Stream(Pin<Box<dyn Stream<Item = IoResult<Bytes>>>>, Option<u64>), Stream(Pin<Box<dyn Stream<Item = IoResult<Bytes>> + Send>>, Size),
FilePath(PathBuf), FilePath(PathBuf),
Bytes(SegmentedBytes), Bytes(SegmentedBytes),
} }
@ -72,28 +118,28 @@ impl Default for ObjectContent {
impl ObjectContent { impl ObjectContent {
/// Create a new `ObjectContent` from a stream of `Bytes`. /// Create a new `ObjectContent` from a stream of `Bytes`.
pub fn new_from_stream( pub fn new_from_stream(
r: impl Stream<Item = IoResult<Bytes>> + 'static, r: impl Stream<Item = IoResult<Bytes>> + Send + 'static,
size: Option<u64>, size: impl Into<Size>,
) -> Self { ) -> Self {
let r = Box::pin(r); let r = Box::pin(r);
ObjectContent(ObjectContentInner::Stream(r, size)) ObjectContent(ObjectContentInner::Stream(r, size.into()))
} }
pub async fn to_stream( pub async fn to_stream(
self, self,
) -> IoResult<(Pin<Box<dyn Stream<Item = IoResult<Bytes>>>>, Option<u64>)> { ) -> IoResult<(Pin<Box<dyn Stream<Item = IoResult<Bytes>> + Send>>, Size)> {
match self.0 { match self.0 {
ObjectContentInner::Stream(r, size) => Ok((r, size)), ObjectContentInner::Stream(r, size) => Ok((r, size)),
ObjectContentInner::FilePath(path) => { ObjectContentInner::FilePath(path) => {
let file = fs::File::open(&path).await?; let file = fs::File::open(&path).await?;
let size = file.metadata().await?.len(); let size = file.metadata().await?.len();
let r = tokio_util::io::ReaderStream::new(file); let r = tokio_util::io::ReaderStream::new(file);
Ok((Box::pin(r), Some(size))) Ok((Box::pin(r), Some(size).into()))
} }
ObjectContentInner::Bytes(sb) => { ObjectContentInner::Bytes(sb) => {
let k = sb.len(); let k = sb.len();
let r = Box::pin(tokio_stream::iter(sb.into_iter().map(Ok))); let r = Box::pin(tokio_stream::iter(sb.into_iter().map(Ok)));
Ok((r, Some(k as u64))) Ok((r, Some(k as u64).into()))
} }
} }
} }
@ -165,18 +211,21 @@ impl ObjectContent {
} }
pub(crate) struct ContentStream { pub(crate) struct ContentStream {
r: Pin<Box<dyn Stream<Item = IoResult<Bytes>>>>, r: Pin<Box<dyn Stream<Item = IoResult<Bytes>> + Send>>,
extra: Option<Bytes>, extra: Option<Bytes>,
size: Option<u64>, size: Size,
} }
impl ContentStream { impl ContentStream {
pub fn new(r: impl Stream<Item = IoResult<Bytes>> + 'static, size: Option<u64>) -> Self { pub fn new(
r: impl Stream<Item = IoResult<Bytes>> + Send + 'static,
size: impl Into<Size>,
) -> Self {
let r = Box::pin(r); let r = Box::pin(r);
Self { Self {
r, r,
extra: None, extra: None,
size, size: size.into(),
} }
} }
@ -184,11 +233,11 @@ impl ContentStream {
Self { Self {
r: Box::pin(tokio_stream::iter(vec![])), r: Box::pin(tokio_stream::iter(vec![])),
extra: None, extra: None,
size: Some(0), size: Some(0).into(),
} }
} }
pub fn get_size(&self) -> Option<u64> { pub fn get_size(&self) -> Size {
self.size self.size
} }

View File

@ -19,15 +19,16 @@ use bytes::BytesMut;
use http::Method; use http::Method;
use crate::s3::{ use crate::s3::{
builders::ContentStream, builders::{ContentStream, Size},
client::Client, client::Client,
error::Error, error::Error,
response::{ response::{
AbortMultipartUploadResponse2, CompleteMultipartUploadResponse2, AbortMultipartUploadResponse2, CompleteMultipartUploadResponse2,
CreateMultipartUploadResponse2, PutObjectResponse2, UploadPartResponse2, CreateMultipartUploadResponse2, PutObjectContentResponse, PutObjectResponse,
UploadPartResponse2,
}, },
sse::Sse, sse::Sse,
types::{Part, Retention, S3Api, S3Request, ToS3Request}, types::{PartInfo, Retention, S3Api, S3Request, ToS3Request},
utils::{check_bucket_name, md5sum_hash, merge, to_iso8601utc, urlencode, Multimap}, utils::{check_bucket_name, md5sum_hash, merge, to_iso8601utc, urlencode, Multimap},
}; };
@ -211,11 +212,11 @@ pub struct CompleteMultipartUpload {
bucket: String, bucket: String,
object: String, object: String,
upload_id: String, upload_id: String,
parts: Vec<Part>, parts: Vec<PartInfo>,
} }
impl CompleteMultipartUpload { impl CompleteMultipartUpload {
pub fn new(bucket: &str, object: &str, upload_id: &str, parts: Vec<Part>) -> Self { pub fn new(bucket: &str, object: &str, upload_id: &str, parts: Vec<PartInfo>) -> Self {
CompleteMultipartUpload { CompleteMultipartUpload {
bucket: bucket.to_string(), bucket: bucket.to_string(),
object: object.to_string(), object: object.to_string(),
@ -544,7 +545,7 @@ impl ToS3Request for PutObject {
} }
impl S3Api for PutObject { impl S3Api for PutObject {
type S3Response = PutObjectResponse2; type S3Response = PutObjectResponse;
} }
fn object_write_args_headers( fn object_write_args_headers(
@ -624,7 +625,7 @@ pub struct PutObjectContent {
tags: Option<HashMap<String, String>>, tags: Option<HashMap<String, String>>,
retention: Option<Retention>, retention: Option<Retention>,
legal_hold: bool, legal_hold: bool,
part_size: Option<u64>, part_size: Size,
content_type: String, content_type: String,
// source data // source data
@ -633,7 +634,7 @@ pub struct PutObjectContent {
// Computed. // Computed.
// expected_parts: Option<u16>, // expected_parts: Option<u16>,
content_stream: ContentStream, content_stream: ContentStream,
part_count: u16, part_count: Option<u16>,
} }
impl PutObjectContent { impl PutObjectContent {
@ -651,10 +652,10 @@ impl PutObjectContent {
tags: None, tags: None,
retention: None, retention: None,
legal_hold: false, legal_hold: false,
part_size: None, part_size: Size::Unknown,
content_type: String::from("application/octet-stream"), content_type: String::from("application/octet-stream"),
content_stream: ContentStream::empty(), content_stream: ContentStream::empty(),
part_count: 0, part_count: None,
} }
} }
@ -703,8 +704,8 @@ impl PutObjectContent {
self self
} }
pub fn part_size(mut self, part_size: Option<u64>) -> Self { pub fn part_size(mut self, part_size: impl Into<Size>) -> Self {
self.part_size = part_size; self.part_size = part_size.into();
self self
} }
@ -713,7 +714,7 @@ impl PutObjectContent {
self self
} }
pub async fn send(mut self) -> Result<PutObjectResponse2, Error> { pub async fn send(mut self) -> Result<PutObjectContentResponse, Error> {
check_bucket_name(&self.bucket, true)?; check_bucket_name(&self.bucket, true)?;
if self.object.is_empty() { if self.object.is_empty() {
@ -728,10 +729,13 @@ impl PutObjectContent {
.await .await
.map_err(|e| Error::IOError(e))?; .map_err(|e| Error::IOError(e))?;
// object_size may be Size::Unknown.
let object_size = self.content_stream.get_size(); let object_size = self.content_stream.get_size();
let (psize, expected_parts) = calc_part_info(object_size, self.part_size)?; let (psize, expected_parts) = calc_part_info(object_size, self.part_size)?;
assert_ne!(expected_parts, Some(0)); // Set the chosen part size and part count.
self.part_size = Some(psize); self.part_size = Size::Known(psize);
self.part_count = expected_parts;
let client = self.client.clone().ok_or(Error::NoClientProvided)?; let client = self.client.clone().ok_or(Error::NoClientProvided)?;
@ -746,13 +750,30 @@ impl PutObjectContent {
// In the first part read, if: // In the first part read, if:
// //
// - we got less than the expected part size, OR // - object_size is unknown AND we got less than the part size, OR
// - we are expecting only one part to be uploaded, // - we are expecting only one part to be uploaded,
// //
// we upload it as a simple put object. // we upload it as a simple put object.
if (seg_bytes.len() as u64) < psize || expected_parts == Some(1) { if (object_size.is_unknown() && (seg_bytes.len() as u64) < psize)
|| expected_parts == Some(1)
{
let size = seg_bytes.len() as u64;
let po = self.to_put_object(seg_bytes); let po = self.to_put_object(seg_bytes);
return po.send().await; let res = po.send().await?;
return Ok(PutObjectContentResponse {
headers: res.headers,
bucket_name: self.bucket,
object_name: self.object,
location: res.location,
object_size: size,
etag: res.etag,
version_id: res.version_id,
});
} else if object_size.is_known() && (seg_bytes.len() as u64) < psize {
// Not enough data!
let expected = object_size.as_u64().unwrap();
let got = seg_bytes.len() as u64;
return Err(Error::InsufficientData(expected, got));
} }
// Otherwise, we start a multipart upload. // Otherwise, we start a multipart upload.
@ -764,16 +785,15 @@ impl PutObjectContent {
let create_mpu_resp = create_mpu.send().await?; let create_mpu_resp = create_mpu.send().await?;
let res = self let mpu_res = self
.send_mpu( .send_mpu(
psize, psize,
expected_parts,
create_mpu_resp.upload_id.clone(), create_mpu_resp.upload_id.clone(),
object_size, object_size,
seg_bytes, seg_bytes,
) )
.await; .await;
if res.is_err() { if mpu_res.is_err() {
// If we failed to complete the multipart upload, we should abort it. // If we failed to complete the multipart upload, we should abort it.
let _ = let _ =
AbortMultipartUpload::new(&self.bucket, &self.object, &create_mpu_resp.upload_id) AbortMultipartUpload::new(&self.bucket, &self.object, &create_mpu_resp.upload_id)
@ -781,26 +801,26 @@ impl PutObjectContent {
.send() .send()
.await; .await;
} }
res mpu_res
} }
async fn send_mpu( async fn send_mpu(
&mut self, &mut self,
psize: u64, psize: u64,
expected_parts: Option<u16>,
upload_id: String, upload_id: String,
object_size: Option<u64>, object_size: Size,
seg_bytes: SegmentedBytes, first_part: SegmentedBytes,
) -> Result<PutObjectResponse2, Error> { ) -> Result<PutObjectContentResponse, Error> {
let mut done = false; let mut done = false;
let mut part_number = 0; let mut part_number = 0;
let mut parts: Vec<Part> = if let Some(pc) = expected_parts { let mut parts: Vec<PartInfo> = if let Some(pc) = self.part_count {
Vec::with_capacity(pc as usize) Vec::with_capacity(pc as usize)
} else { } else {
Vec::new() Vec::new()
}; };
let mut first_part = Some(seg_bytes); let mut first_part = Some(first_part);
let mut total_read = 0;
while !done { while !done {
let part_content = { let part_content = {
if let Some(v) = first_part.take() { if let Some(v) = first_part.take() {
@ -811,6 +831,7 @@ impl PutObjectContent {
}; };
part_number += 1; part_number += 1;
let buffer_size = part_content.len() as u64; let buffer_size = part_content.len() as u64;
total_read += buffer_size;
assert!(buffer_size <= psize, "{:?} <= {:?}", buffer_size, psize); assert!(buffer_size <= psize, "{:?} <= {:?}", buffer_size, psize);
@ -821,20 +842,24 @@ impl PutObjectContent {
} }
// Check if we have too many parts to upload. // Check if we have too many parts to upload.
if expected_parts.is_none() && part_number > MAX_MULTIPART_COUNT { if self.part_count.is_none() && part_number > MAX_MULTIPART_COUNT {
return Err(Error::InvalidPartCount( return Err(Error::TooManyParts);
object_size.unwrap_or(0), }
self.part_size.unwrap(),
self.part_count, if object_size.is_known() {
)); let exp = object_size.as_u64().unwrap();
if exp < total_read {
return Err(Error::TooMuchData(exp));
}
} }
// Upload the part now. // Upload the part now.
let upload_part = self.to_upload_part(part_content, &upload_id, part_number); let upload_part = self.to_upload_part(part_content, &upload_id, part_number);
let upload_part_resp = upload_part.send().await?; let upload_part_resp = upload_part.send().await?;
parts.push(Part { parts.push(PartInfo {
number: part_number, number: part_number,
etag: upload_part_resp.etag, etag: upload_part_resp.etag,
size: buffer_size,
}); });
// Finally check if we are done. // Finally check if we are done.
@ -844,8 +869,26 @@ impl PutObjectContent {
} }
// Complete the multipart upload. // Complete the multipart upload.
let size = parts.iter().map(|p| p.size).sum();
if object_size.is_known() {
let expected = object_size.as_u64().unwrap();
if expected != size {
return Err(Error::InsufficientData(expected, size));
}
}
let complete_mpu = self.to_complete_multipart_upload(&upload_id, parts); let complete_mpu = self.to_complete_multipart_upload(&upload_id, parts);
complete_mpu.send().await let res = complete_mpu.send().await?;
Ok(PutObjectContentResponse {
headers: res.headers,
bucket_name: self.bucket.clone(),
object_name: self.object.clone(),
location: res.location,
object_size: size,
etag: res.etag,
version_id: res.version_id,
})
} }
} }
@ -896,7 +939,7 @@ impl PutObjectContent {
fn to_complete_multipart_upload( fn to_complete_multipart_upload(
&self, &self,
upload_id: &str, upload_id: &str,
parts: Vec<Part>, parts: Vec<PartInfo>,
) -> CompleteMultipartUpload { ) -> CompleteMultipartUpload {
CompleteMultipartUpload { CompleteMultipartUpload {
client: self.client.clone(), client: self.client.clone(),
@ -918,12 +961,9 @@ pub const MAX_MULTIPART_COUNT: u16 = 10_000;
// Returns the size of each part to upload and the total number of parts. The // Returns the size of each part to upload and the total number of parts. The
// number of parts is `None` when the object size is unknown. // number of parts is `None` when the object size is unknown.
fn calc_part_info( fn calc_part_info(object_size: Size, part_size: Size) -> Result<(u64, Option<u16>), Error> {
object_size: Option<u64>,
part_size: Option<u64>,
) -> Result<(u64, Option<u16>), Error> {
// Validate arguments against limits. // Validate arguments against limits.
if let Some(v) = part_size { if let Size::Known(v) = part_size {
if v < MIN_PART_SIZE { if v < MIN_PART_SIZE {
return Err(Error::InvalidMinPartSize(v)); return Err(Error::InvalidMinPartSize(v));
} }
@ -933,18 +973,28 @@ fn calc_part_info(
} }
} }
if let Some(v) = object_size { if let Size::Known(v) = object_size {
if v > MAX_OBJECT_SIZE { if v > MAX_OBJECT_SIZE {
return Err(Error::InvalidObjectSize(v)); return Err(Error::InvalidObjectSize(v));
} }
} }
match (object_size, part_size) { match (object_size, part_size) {
(None, None) => Err(Error::MissingPartSize), // If object size is unknown, part size must be provided.
(None, Some(part_size)) => Ok((part_size, None)), (Size::Unknown, Size::Unknown) => Err(Error::MissingPartSize),
(Some(object_size), None) => {
// If object size is unknown, and part size is known, the number of
// parts will be unknown, so return None for that.
(Size::Unknown, Size::Known(part_size)) => Ok((part_size, None)),
// If object size is known, and part size is unknown, calculate part
// size.
(Size::Known(object_size), Size::Unknown) => {
// 1. Calculate the minimum part size (i.e. assuming part count is
// maximum).
let mut psize: u64 = (object_size as f64 / MAX_MULTIPART_COUNT as f64).ceil() as u64; let mut psize: u64 = (object_size as f64 / MAX_MULTIPART_COUNT as f64).ceil() as u64;
// 2. Round up to the nearest multiple of MIN_PART_SIZE.
psize = MIN_PART_SIZE * (psize as f64 / MIN_PART_SIZE as f64).ceil() as u64; psize = MIN_PART_SIZE * (psize as f64 / MIN_PART_SIZE as f64).ceil() as u64;
if psize > object_size { if psize > object_size {
@ -959,7 +1009,10 @@ fn calc_part_info(
Ok((psize, Some(part_count))) Ok((psize, Some(part_count)))
} }
(Some(object_size), Some(part_size)) => {
// If both object size and part size are known, validate the resulting
// part count and return.
(Size::Known(object_size), Size::Known(part_size)) => {
let part_count = (object_size as f64 / part_size as f64).ceil() as u16; let part_count = (object_size as f64 / part_size as f64).ceil() as u16;
if part_count == 0 || part_count > MAX_MULTIPART_COUNT { if part_count == 0 || part_count > MAX_MULTIPART_COUNT {
return Err(Error::InvalidPartCount( return Err(Error::InvalidPartCount(
@ -973,3 +1026,80 @@ fn calc_part_info(
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
quickcheck! {
fn test_calc_part_info(object_size: Size, part_size: Size) -> bool {
let res = calc_part_info(object_size, part_size);
// Validate that basic invalid sizes return the expected error.
if let Size::Known(v) = part_size {
if v < MIN_PART_SIZE {
match res {
Err(Error::InvalidMinPartSize(v_err)) => return v == v_err,
_ => return false,
}
}
if v > MAX_PART_SIZE {
match res {
Err(Error::InvalidMaxPartSize(v_err)) => return v == v_err,
_ => return false,
}
}
}
if let Size::Known(v) = object_size {
if v > MAX_OBJECT_SIZE {
match res {
Err(Error::InvalidObjectSize(v_err)) => return v == v_err,
_ => return false,
}
}
}
// Validate the calculation of part size and part count.
match (object_size, part_size, res) {
(Size::Unknown, Size::Unknown, Err(Error::MissingPartSize)) => true,
(Size::Unknown, Size::Unknown, _) => false,
(Size::Unknown, Size::Known(part_size), Ok((psize, None))) => {
psize == part_size
}
(Size::Unknown, Size::Known(_), _) => false,
(Size::Known(object_size), Size::Unknown, Ok((psize, Some(part_count)))) => {
if object_size < MIN_PART_SIZE {
return psize == object_size && part_count == 1;
}
if psize < MIN_PART_SIZE || psize > MAX_PART_SIZE{
return false;
}
if psize > object_size {
return false;
}
part_count > 0 && part_count <= MAX_MULTIPART_COUNT
}
(Size::Known(_), Size::Unknown, _) => false,
(Size::Known(object_size), Size::Known(part_size), res) => {
if part_size > object_size || (part_size * (MAX_MULTIPART_COUNT as u64)) < object_size {
match res {
Err(Error::InvalidPartCount(v1, v2, v3)) => {
return v1 == object_size && v2 == part_size && v3 == MAX_MULTIPART_COUNT;
}
_ => return false,
}
}
match res {
Ok((psize, part_count)) => {
let expected_part_count = (object_size as f64 / part_size as f64).ceil() as u16;
return psize == part_size && part_count == Some(expected_part_count);
}
_ => false,
}
}
}
}
}
}

View File

@ -2342,7 +2342,7 @@ impl Client {
args: &mut PutObjectArgs<'_>, args: &mut PutObjectArgs<'_>,
buf: &mut [u8], buf: &mut [u8],
upload_id: &mut String, upload_id: &mut String,
) -> Result<PutObjectResponse, Error> { ) -> Result<PutObjectResponseOld, Error> {
let mut headers = args.get_headers(); let mut headers = args.get_headers();
if !headers.contains_key("Content-Type") { if !headers.contains_key("Content-Type") {
if args.content_type.is_empty() { if args.content_type.is_empty() {
@ -2375,7 +2375,7 @@ impl Client {
bytes_read = Client::read_part(&mut args.stream, buf, part_size)?; bytes_read = Client::read_part(&mut args.stream, buf, part_size)?;
if bytes_read != part_size { if bytes_read != part_size {
return Err(Error::InsufficientData(part_size, bytes_read)); return Err(Error::InsufficientData(part_size as u64, bytes_read as u64));
} }
} else { } else {
let mut size = part_size + 1; let mut size = part_size + 1;
@ -2459,7 +2459,7 @@ impl Client {
pub async fn put_object_old( pub async fn put_object_old(
&self, &self,
args: &mut PutObjectArgs<'_>, args: &mut PutObjectArgs<'_>,
) -> Result<PutObjectResponse, Error> { ) -> Result<PutObjectResponseOld, Error> {
if let Some(v) = &args.sse { if let Some(v) = &args.sse {
if v.tls_required() && !self.base_url.https { if v.tls_required() && !self.base_url.https {
return Err(Error::SseTlsRequired(None)); return Err(Error::SseTlsRequired(None));

View File

@ -21,7 +21,7 @@ use crate::s3::{
AbortMultipartUpload, CompleteMultipartUpload, CreateMultipartUpload, ObjectContent, AbortMultipartUpload, CompleteMultipartUpload, CreateMultipartUpload, ObjectContent,
PutObject, PutObjectContent, SegmentedBytes, UploadPart, PutObject, PutObjectContent, SegmentedBytes, UploadPart,
}, },
types::Part, types::PartInfo,
}; };
impl Client { impl Client {
@ -50,7 +50,7 @@ impl Client {
bucket: &str, bucket: &str,
object: &str, object: &str,
upload_id: &str, upload_id: &str,
parts: Vec<Part>, parts: Vec<PartInfo>,
) -> CompleteMultipartUpload { ) -> CompleteMultipartUpload {
CompleteMultipartUpload::new(bucket, object, upload_id, parts).client(self) CompleteMultipartUpload::new(bucket, object, upload_id, parts).client(self)
} }

View File

@ -84,8 +84,10 @@ pub enum Error {
InvalidObjectSize(u64), InvalidObjectSize(u64),
MissingPartSize, MissingPartSize,
InvalidPartCount(u64, u64, u16), InvalidPartCount(u64, u64, u16),
TooManyParts,
SseTlsRequired(Option<String>), SseTlsRequired(Option<String>),
InsufficientData(usize, usize), TooMuchData(u64),
InsufficientData(u64, u64),
InvalidLegalHold(String), InvalidLegalHold(String),
InvalidSelectExpression(String), InvalidSelectExpression(String),
InvalidHeaderValueType(u8), InvalidHeaderValueType(u8),
@ -157,15 +159,17 @@ impl fmt::Display for Error {
"object size {} and part size {} make more than {} parts for upload", "object size {} and part size {} make more than {} parts for upload",
os, ps, pc os, ps, pc
), ),
Error::TooManyParts => write!(f, "too many parts for upload"),
Error::SseTlsRequired(m) => write!( Error::SseTlsRequired(m) => write!(
f, f,
"{}SSE operation must be performed over a secure connection", "{}SSE operation must be performed over a secure connection",
m.as_ref().map_or(String::new(), |v| v.clone()) m.as_ref().map_or(String::new(), |v| v.clone())
), ),
Error::InsufficientData(ps, br) => write!( Error::TooMuchData(s) => write!(f, "too much data in the stream - exceeds {} bytes", s),
Error::InsufficientData(expected, got) => write!(
f, f,
"not enough data in the stream; expected: {}, got: {} bytes", "not enough data in the stream; expected: {}, got: {} bytes",
ps, br expected, got
), ),
Error::InvalidBaseUrl(m) => write!(f, "{}", m), Error::InvalidBaseUrl(m) => write!(f, "{}", m),
Error::UrlBuildError(m) => write!(f, "{}", m), Error::UrlBuildError(m) => write!(f, "{}", m),

View File

@ -45,7 +45,8 @@ pub use list_objects::{
pub use listen_bucket_notification::ListenBucketNotificationResponse; pub use listen_bucket_notification::ListenBucketNotificationResponse;
pub use put_object::{ pub use put_object::{
AbortMultipartUploadResponse2, CompleteMultipartUploadResponse2, AbortMultipartUploadResponse2, CompleteMultipartUploadResponse2,
CreateMultipartUploadResponse2, PutObjectResponse as PutObjectResponse2, UploadPartResponse2, CreateMultipartUploadResponse2, PutObjectContentResponse, PutObjectResponse,
UploadPartResponse2,
}; };
#[derive(Debug)] #[derive(Debug)]
@ -112,7 +113,7 @@ pub type PutObjectApiResponse = PutObjectBaseResponse;
pub type UploadPartResponse = PutObjectApiResponse; pub type UploadPartResponse = PutObjectApiResponse;
/// Response of [put_object()](crate::s3::client::Client::put_object) API /// Response of [put_object()](crate::s3::client::Client::put_object) API
pub type PutObjectResponse = PutObjectApiResponse; pub type PutObjectResponseOld = PutObjectApiResponse;
/// Response of [upload_part_copy()](crate::s3::client::Client::upload_part_copy) S3 API /// Response of [upload_part_copy()](crate::s3::client::Client::upload_part_copy) S3 API
pub type UploadPartCopyResponse = PutObjectApiResponse; pub type UploadPartCopyResponse = PutObjectApiResponse;
@ -332,7 +333,10 @@ impl SelectObjectContentResponse {
self.prelude_read = true; self.prelude_read = true;
for i in 0..8 { for i in 0..8 {
self.prelude[i] = self.buf.pop_front().ok_or(Error::InsufficientData(8, i))?; self.prelude[i] = self
.buf
.pop_front()
.ok_or(Error::InsufficientData(8 as u64, i as u64))?;
} }
Ok(true) Ok(true)
@ -345,7 +349,10 @@ impl SelectObjectContentResponse {
self.prelude_crc_read = true; self.prelude_crc_read = true;
for i in 0..4 { for i in 0..4 {
self.prelude_crc[i] = self.buf.pop_front().ok_or(Error::InsufficientData(4, i))?; self.prelude_crc[i] = self
.buf
.pop_front()
.ok_or(Error::InsufficientData(4 as u64, i as u64))?;
} }
Ok(true) Ok(true)
@ -364,7 +371,7 @@ impl SelectObjectContentResponse {
self.data.push( self.data.push(
self.buf self.buf
.pop_front() .pop_front()
.ok_or(Error::InsufficientData(data_length, i))?, .ok_or(Error::InsufficientData(data_length as u64, i as u64))?,
); );
} }
@ -378,7 +385,10 @@ impl SelectObjectContentResponse {
self.message_crc_read = true; self.message_crc_read = true;
for i in 0..4 { for i in 0..4 {
self.message_crc[i] = self.buf.pop_front().ok_or(Error::InsufficientData(4, i))?; self.message_crc[i] = self
.buf
.pop_front()
.ok_or(Error::InsufficientData(4 as u64, i as u64))?;
} }
Ok(true) Ok(true)

View File

@ -29,6 +29,8 @@ pub struct GetObjectResponse2 {
pub object_name: String, pub object_name: String,
pub version_id: Option<String>, pub version_id: Option<String>,
pub content: ObjectContent, pub content: ObjectContent,
pub object_size: u64,
pub etag: Option<String>,
} }
#[async_trait] #[async_trait]
@ -38,10 +40,13 @@ impl FromS3Response for GetObjectResponse2 {
response: reqwest::Response, response: reqwest::Response,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let header_map = response.headers().clone(); let header_map = response.headers().clone();
let version_id = match header_map.get("x-amz-version-id") { let version_id = header_map
Some(v) => Some(v.to_str()?.to_string()), .get("x-amz-version-id")
None => None, .map(|v| v.to_str().unwrap().to_string());
};
let etag = header_map
.get("etag")
.map(|v| v.to_str().unwrap().trim_matches('"').to_string());
let content_length = response let content_length = response
.content_length() .content_length()
@ -59,6 +64,8 @@ impl FromS3Response for GetObjectResponse2 {
object_name: req.object.unwrap().to_string(), object_name: req.object.unwrap().to_string(),
version_id, version_id,
content, content,
object_size: content_length,
etag,
}) })
} }
} }

View File

@ -59,10 +59,8 @@ impl FromS3Response for PutObjectResponse {
} }
} }
pub type CreateMultipartUploadResponse2 = UploadIdResponse2;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct UploadIdResponse2 { pub struct CreateMultipartUploadResponse2 {
pub headers: HeaderMap, pub headers: HeaderMap,
pub region: String, pub region: String,
pub bucket_name: String, pub bucket_name: String,
@ -71,7 +69,7 @@ pub struct UploadIdResponse2 {
} }
#[async_trait] #[async_trait]
impl FromS3Response for UploadIdResponse2 { impl FromS3Response for CreateMultipartUploadResponse2 {
async fn from_s3response<'a>( async fn from_s3response<'a>(
req: S3Request<'a>, req: S3Request<'a>,
response: reqwest::Response, response: reqwest::Response,
@ -90,8 +88,19 @@ impl FromS3Response for UploadIdResponse2 {
} }
} }
pub type AbortMultipartUploadResponse2 = UploadIdResponse2; pub type AbortMultipartUploadResponse2 = CreateMultipartUploadResponse2;
pub type CompleteMultipartUploadResponse2 = PutObjectResponse; pub type CompleteMultipartUploadResponse2 = PutObjectResponse;
pub type UploadPartResponse2 = PutObjectResponse; pub type UploadPartResponse2 = PutObjectResponse;
#[derive(Debug, Clone)]
pub struct PutObjectContentResponse {
pub headers: HeaderMap,
pub bucket_name: String,
pub object_name: String,
pub location: String,
pub object_size: u64,
pub etag: String,
pub version_id: Option<String>,
}

View File

@ -180,6 +180,14 @@ pub struct Part {
pub etag: String, pub etag: String,
} }
#[derive(Clone, Debug)]
pub struct PartInfo {
pub number: u16,
pub etag: String,
pub size: u64,
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
/// Contains retention mode information /// Contains retention mode information
pub enum RetentionMode { pub enum RetentionMode {

View File

@ -315,6 +315,7 @@ impl ClientTest {
async fn put_object_content(&self) { async fn put_object_content(&self) {
let object_name = rand_object_name(); let object_name = rand_object_name();
let sizes = vec![16_u64, 5 * 1024 * 1024, 16 + 5 * 1024 * 1024]; let sizes = vec![16_u64, 5 * 1024 * 1024, 16 + 5 * 1024 * 1024];
for size in sizes.iter() { for size in sizes.iter() {
let data_src = RandSrc::new(*size); let data_src = RandSrc::new(*size);
let rsp = self let rsp = self
@ -327,6 +328,7 @@ impl ClientTest {
.send() .send()
.await .await
.unwrap(); .unwrap();
assert_eq!(rsp.object_size, *size);
let etag = rsp.etag; let etag = rsp.etag;
let resp = self let resp = self
.client .client
@ -340,6 +342,91 @@ impl ClientTest {
.await .await
.unwrap(); .unwrap();
} }
// Repeat test with no size specified in ObjectContent
for size in sizes.iter() {
let data_src = RandSrc::new(*size);
let rsp = self
.client
.put_object_content(
&self.test_bucket,
&object_name,
ObjectContent::new_from_stream(data_src, None),
)
.part_size(Some(5 * 1024 * 1024)) // Set part size to 5MB
.send()
.await
.unwrap();
assert_eq!(rsp.object_size, *size);
let etag = rsp.etag;
let resp = self
.client
.stat_object(&StatObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.await
.unwrap();
assert_eq!(resp.size, *size as usize);
assert_eq!(resp.etag, etag);
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.await
.unwrap();
}
}
// Test sending ObjectContent across async tasks.
async fn put_object_content_2(&self) {
let object_name = rand_object_name();
let sizes = vec![16_u64, 5 * 1024 * 1024, 16 + 5 * 1024 * 1024];
let (sender, mut receiver): (mpsc::Sender<ObjectContent>, mpsc::Receiver<ObjectContent>) =
mpsc::channel(2);
let sender_handle = {
let sizes = sizes.clone();
tokio::spawn(async move {
for size in sizes.iter() {
let data_src = RandSrc::new(*size);
sender
.send(ObjectContent::new_from_stream(data_src, Some(*size)))
.await
.unwrap();
}
})
};
let uploader_handler = {
let sizes = sizes.clone();
let object_name = object_name.clone();
let client = self.client.clone();
let test_bucket = self.test_bucket.clone();
tokio::spawn(async move {
let mut idx = 0;
while let Some(item) = receiver.recv().await {
let rsp = client
.put_object_content(&test_bucket, &object_name, item)
.send()
.await
.unwrap();
assert_eq!(rsp.object_size, sizes[idx]);
let etag = rsp.etag;
let resp = client
.stat_object(&StatObjectArgs::new(&test_bucket, &object_name).unwrap())
.await
.unwrap();
assert_eq!(resp.size, sizes[idx] as usize);
assert_eq!(resp.etag, etag);
client
.remove_object(&RemoveObjectArgs::new(&test_bucket, &object_name).unwrap())
.await
.unwrap();
idx += 1;
}
})
};
sender_handle.await.unwrap();
uploader_handler.await.unwrap();
} }
async fn get_object_old(&self) { async fn get_object_old(&self) {
@ -1323,6 +1410,9 @@ async fn s3_tests() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("put_object_stream()"); println!("put_object_stream()");
ctest.put_object_content().await; ctest.put_object_content().await;
println!("put_object_stream_2()");
ctest.put_object_content_2().await;
println!("get_object_old()"); println!("get_object_old()");
ctest.get_object_old().await; ctest.get_object_old().await;