From af8193aa9503a381fdd52db0ac18d1ddc565b1ae Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Wed, 29 May 2024 18:43:34 -0700 Subject: [PATCH] Add metadata and other options for CreateMultipart (#90) - Add validation for user-metadata keys. - Ensure that options are passed to both single part PutObject and Multipart upload when using the PutObjectContent higher level API. --- src/s3/builders/put_object.rs | 141 ++++++++++++++++++++++++++-------- src/s3/error.rs | 2 + 2 files changed, 109 insertions(+), 34 deletions(-) diff --git a/src/s3/builders/put_object.rs b/src/s3/builders/put_object.rs index d4dfee4..8f526db 100644 --- a/src/s3/builders/put_object.rs +++ b/src/s3/builders/put_object.rs @@ -46,6 +46,12 @@ pub struct CreateMultipartUpload { region: Option, bucket: String, object: String, + + user_metadata: Option, + sse: Option>, + tags: Option>, + retention: Option, + legal_hold: bool, } impl CreateMultipartUpload { @@ -76,22 +82,61 @@ impl CreateMultipartUpload { self.region = region; self } + + pub fn user_metadata(mut self, user_metadata: Option) -> Self { + self.user_metadata = user_metadata; + self + } + + pub fn sse(mut self, sse: Option>) -> Self { + self.sse = sse; + self + } + + pub fn tags(mut self, tags: Option>) -> Self { + self.tags = tags; + self + } + + pub fn retention(mut self, retention: Option) -> Self { + self.retention = retention; + self + } + + pub fn legal_hold(mut self, legal_hold: bool) -> Self { + self.legal_hold = legal_hold; + self + } + + fn get_headers(&self) -> Result { + object_write_args_headers( + self.extra_headers.as_ref(), + self.user_metadata.as_ref(), + &self.sse, + self.tags.as_ref(), + self.retention.as_ref(), + self.legal_hold, + ) + } + + fn validate(&self) -> Result<(), Error> { + check_bucket_name(&self.bucket, true)?; + + if self.object.is_empty() { + return Err(Error::InvalidObjectName(String::from( + "object name cannot be empty", + ))); + } + + Ok(()) + } } impl ToS3Request for CreateMultipartUpload { fn to_s3request(&self) -> Result { - check_bucket_name(&self.bucket, true)?; + self.validate()?; - let mut headers = Multimap::new(); - if let Some(v) = &self.extra_headers { - merge(&mut headers, v); - } - if !headers.contains_key("Content-Type") { - headers.insert( - String::from("Content-Type"), - String::from("application/octet-stream"), - ); - } + let headers = self.get_headers()?; let mut query_params = Multimap::new(); if let Some(v) = &self.extra_query_params { @@ -326,14 +371,17 @@ pub struct UploadPart { bucket: String, object: String, region: Option, - user_metadata: Option, sse: Option>, tags: Option>, retention: Option, legal_hold: bool, data: SegmentedBytes, - // These are optional as the struct is reused for PutObject. + // This is used only when this struct is used for PutObject. + user_metadata: Option, + + // These are only used for multipart UploadPart but not for PutObject, so + // they are optional. upload_id: Option, part_number: Option, } @@ -376,11 +424,6 @@ impl UploadPart { self } - pub fn user_metadata(mut self, user_metadata: Option) -> Self { - self.user_metadata = user_metadata; - self - } - pub fn sse(mut self, sse: Option>) -> Self { self.sse = sse; self @@ -401,10 +444,9 @@ impl UploadPart { self } - fn get_headers(&self) -> Multimap { + fn get_headers(&self) -> Result { object_write_args_headers( self.extra_headers.as_ref(), - None, self.user_metadata.as_ref(), &self.sse, self.tags.as_ref(), @@ -446,7 +488,7 @@ impl ToS3Request for UploadPart { fn to_s3request(&self) -> Result { self.validate()?; - let headers = self.get_headers(); + let headers = self.get_headers()?; let mut query_params = Multimap::new(); if let Some(v) = &self.extra_query_params { @@ -550,24 +592,34 @@ impl S3Api for PutObject { fn object_write_args_headers( extra_headers: Option<&Multimap>, - headers: Option<&Multimap>, user_metadata: Option<&Multimap>, sse: &Option>, tags: Option<&HashMap>, retention: Option<&Retention>, legal_hold: bool, -) -> Multimap { +) -> Result { let mut map = Multimap::new(); if let Some(v) = extra_headers { merge(&mut map, v); } - if let Some(v) = headers { - merge(&mut map, v); - } - if let Some(v) = user_metadata { + // Validate it. + for (k, _) in v.iter() { + if k.is_empty() { + return Err(Error::InvalidUserMetadata(String::from( + "user metadata key cannot be empty", + ))); + } + if !k.starts_with("x-amz-meta-") { + return Err(Error::InvalidUserMetadata(format!( + "user metadata key '{}' does not start with 'x-amz-meta-'", + k + ))); + } + } + merge(&mut map, v); } @@ -606,7 +658,15 @@ fn object_write_args_headers( ); } - map + // Set the Content-Type header if not already set. + if !map.contains_key("Content-Type") { + map.insert( + String::from("Content-Type"), + String::from("application/octet-stream"), + ); + } + + Ok(map) } // PutObjectContent takes a `ObjectContent` stream and uploads it to MinIO/S3. @@ -777,11 +837,7 @@ impl PutObjectContent { } // Otherwise, we start a multipart upload. - let create_mpu = CreateMultipartUpload::new(&self.bucket, &self.object) - .client(&client) - .extra_headers(self.extra_headers.clone()) - .extra_query_params(self.extra_query_params.clone()) - .region(self.region.clone()); + let create_mpu = self.to_create_multipart_upload(); let create_mpu_resp = create_mpu.send().await?; @@ -925,7 +981,8 @@ impl PutObjectContent { bucket: self.bucket.clone(), object: self.object.clone(), region: self.region.clone(), - user_metadata: self.user_metadata.clone(), + // User metadata is not sent with UploadPart. + user_metadata: None, sse: self.sse.clone(), tags: self.tags.clone(), retention: self.retention.clone(), @@ -952,6 +1009,22 @@ impl PutObjectContent { upload_id: upload_id.to_string(), } } + + fn to_create_multipart_upload(&self) -> CreateMultipartUpload { + CreateMultipartUpload { + client: self.client.clone(), + extra_headers: self.extra_headers.clone(), + extra_query_params: self.extra_query_params.clone(), + region: self.region.clone(), + bucket: self.bucket.clone(), + object: self.object.clone(), + user_metadata: self.user_metadata.clone(), + sse: self.sse.clone(), + tags: self.tags.clone(), + retention: self.retention.clone(), + legal_hold: self.legal_hold, + } + } } pub const MIN_PART_SIZE: u64 = 5 * 1024 * 1024; // 5 MiB diff --git a/src/s3/error.rs b/src/s3/error.rs index b3a10c9..2f114d8 100644 --- a/src/s3/error.rs +++ b/src/s3/error.rs @@ -76,6 +76,7 @@ pub enum Error { InvalidObjectName(String), InvalidUploadId(String), InvalidPartNumber(String), + InvalidUserMetadata(String), EmptyParts(String), InvalidRetentionMode(String), InvalidRetentionConfig(String), @@ -136,6 +137,7 @@ impl fmt::Display for Error { Error::InvalidObjectName(m) => write!(f, "{}", m), Error::InvalidUploadId(m) => write!(f, "{}", m), Error::InvalidPartNumber(m) => write!(f, "{}", m), + Error::InvalidUserMetadata(m) => write!(f, "{}", m), Error::EmptyParts(m) => write!(f, "{}", m), Error::InvalidRetentionMode(m) => write!(f, "invalid retention mode {}", m), Error::InvalidRetentionConfig(m) => write!(f, "invalid retention configuration; {}", m),