Add metadata and other options for CreateMultipart (#90)

- Add validation for user-metadata keys.

- Ensure that options are passed to both single part PutObject and
  Multipart upload when using the PutObjectContent higher level API.
This commit is contained in:
Aditya Manthramurthy 2024-05-29 18:43:34 -07:00 committed by GitHub
parent 18c5707a4a
commit af8193aa95
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 109 additions and 34 deletions

View File

@ -46,6 +46,12 @@ pub struct CreateMultipartUpload {
region: Option<String>,
bucket: String,
object: String,
user_metadata: Option<Multimap>,
sse: Option<Arc<dyn Sse>>,
tags: Option<HashMap<String, String>>,
retention: Option<Retention>,
legal_hold: bool,
}
impl CreateMultipartUpload {
@ -76,22 +82,61 @@ impl CreateMultipartUpload {
self.region = region;
self
}
pub fn user_metadata(mut self, user_metadata: Option<Multimap>) -> Self {
self.user_metadata = user_metadata;
self
}
pub fn sse(mut self, sse: Option<Arc<dyn Sse>>) -> Self {
self.sse = sse;
self
}
pub fn tags(mut self, tags: Option<HashMap<String, String>>) -> Self {
self.tags = tags;
self
}
pub fn retention(mut self, retention: Option<Retention>) -> Self {
self.retention = retention;
self
}
pub fn legal_hold(mut self, legal_hold: bool) -> Self {
self.legal_hold = legal_hold;
self
}
fn get_headers(&self) -> Result<Multimap, Error> {
object_write_args_headers(
self.extra_headers.as_ref(),
self.user_metadata.as_ref(),
&self.sse,
self.tags.as_ref(),
self.retention.as_ref(),
self.legal_hold,
)
}
fn validate(&self) -> Result<(), Error> {
check_bucket_name(&self.bucket, true)?;
if self.object.is_empty() {
return Err(Error::InvalidObjectName(String::from(
"object name cannot be empty",
)));
}
Ok(())
}
}
impl ToS3Request for CreateMultipartUpload {
fn to_s3request(&self) -> Result<S3Request, Error> {
check_bucket_name(&self.bucket, true)?;
self.validate()?;
let mut headers = Multimap::new();
if let Some(v) = &self.extra_headers {
merge(&mut headers, v);
}
if !headers.contains_key("Content-Type") {
headers.insert(
String::from("Content-Type"),
String::from("application/octet-stream"),
);
}
let headers = self.get_headers()?;
let mut query_params = Multimap::new();
if let Some(v) = &self.extra_query_params {
@ -326,14 +371,17 @@ pub struct UploadPart {
bucket: String,
object: String,
region: Option<String>,
user_metadata: Option<Multimap>,
sse: Option<Arc<dyn Sse>>,
tags: Option<HashMap<String, String>>,
retention: Option<Retention>,
legal_hold: bool,
data: SegmentedBytes,
// These are optional as the struct is reused for PutObject.
// This is used only when this struct is used for PutObject.
user_metadata: Option<Multimap>,
// These are only used for multipart UploadPart but not for PutObject, so
// they are optional.
upload_id: Option<String>,
part_number: Option<u16>,
}
@ -376,11 +424,6 @@ impl UploadPart {
self
}
pub fn user_metadata(mut self, user_metadata: Option<Multimap>) -> Self {
self.user_metadata = user_metadata;
self
}
pub fn sse(mut self, sse: Option<Arc<dyn Sse>>) -> Self {
self.sse = sse;
self
@ -401,10 +444,9 @@ impl UploadPart {
self
}
fn get_headers(&self) -> Multimap {
fn get_headers(&self) -> Result<Multimap, Error> {
object_write_args_headers(
self.extra_headers.as_ref(),
None,
self.user_metadata.as_ref(),
&self.sse,
self.tags.as_ref(),
@ -446,7 +488,7 @@ impl ToS3Request for UploadPart {
fn to_s3request(&self) -> Result<S3Request, Error> {
self.validate()?;
let headers = self.get_headers();
let headers = self.get_headers()?;
let mut query_params = Multimap::new();
if let Some(v) = &self.extra_query_params {
@ -550,24 +592,34 @@ impl S3Api for PutObject {
fn object_write_args_headers(
extra_headers: Option<&Multimap>,
headers: Option<&Multimap>,
user_metadata: Option<&Multimap>,
sse: &Option<Arc<dyn Sse>>,
tags: Option<&HashMap<String, String>>,
retention: Option<&Retention>,
legal_hold: bool,
) -> Multimap {
) -> Result<Multimap, Error> {
let mut map = Multimap::new();
if let Some(v) = extra_headers {
merge(&mut map, v);
}
if let Some(v) = headers {
merge(&mut map, v);
}
if let Some(v) = user_metadata {
// Validate it.
for (k, _) in v.iter() {
if k.is_empty() {
return Err(Error::InvalidUserMetadata(String::from(
"user metadata key cannot be empty",
)));
}
if !k.starts_with("x-amz-meta-") {
return Err(Error::InvalidUserMetadata(format!(
"user metadata key '{}' does not start with 'x-amz-meta-'",
k
)));
}
}
merge(&mut map, v);
}
@ -606,7 +658,15 @@ fn object_write_args_headers(
);
}
map
// Set the Content-Type header if not already set.
if !map.contains_key("Content-Type") {
map.insert(
String::from("Content-Type"),
String::from("application/octet-stream"),
);
}
Ok(map)
}
// PutObjectContent takes a `ObjectContent` stream and uploads it to MinIO/S3.
@ -777,11 +837,7 @@ impl PutObjectContent {
}
// Otherwise, we start a multipart upload.
let create_mpu = CreateMultipartUpload::new(&self.bucket, &self.object)
.client(&client)
.extra_headers(self.extra_headers.clone())
.extra_query_params(self.extra_query_params.clone())
.region(self.region.clone());
let create_mpu = self.to_create_multipart_upload();
let create_mpu_resp = create_mpu.send().await?;
@ -925,7 +981,8 @@ impl PutObjectContent {
bucket: self.bucket.clone(),
object: self.object.clone(),
region: self.region.clone(),
user_metadata: self.user_metadata.clone(),
// User metadata is not sent with UploadPart.
user_metadata: None,
sse: self.sse.clone(),
tags: self.tags.clone(),
retention: self.retention.clone(),
@ -952,6 +1009,22 @@ impl PutObjectContent {
upload_id: upload_id.to_string(),
}
}
fn to_create_multipart_upload(&self) -> CreateMultipartUpload {
CreateMultipartUpload {
client: self.client.clone(),
extra_headers: self.extra_headers.clone(),
extra_query_params: self.extra_query_params.clone(),
region: self.region.clone(),
bucket: self.bucket.clone(),
object: self.object.clone(),
user_metadata: self.user_metadata.clone(),
sse: self.sse.clone(),
tags: self.tags.clone(),
retention: self.retention.clone(),
legal_hold: self.legal_hold,
}
}
}
pub const MIN_PART_SIZE: u64 = 5 * 1024 * 1024; // 5 MiB

View File

@ -76,6 +76,7 @@ pub enum Error {
InvalidObjectName(String),
InvalidUploadId(String),
InvalidPartNumber(String),
InvalidUserMetadata(String),
EmptyParts(String),
InvalidRetentionMode(String),
InvalidRetentionConfig(String),
@ -136,6 +137,7 @@ impl fmt::Display for Error {
Error::InvalidObjectName(m) => write!(f, "{}", m),
Error::InvalidUploadId(m) => write!(f, "{}", m),
Error::InvalidPartNumber(m) => write!(f, "{}", m),
Error::InvalidUserMetadata(m) => write!(f, "{}", m),
Error::EmptyParts(m) => write!(f, "{}", m),
Error::InvalidRetentionMode(m) => write!(f, "invalid retention mode {}", m),
Error::InvalidRetentionConfig(m) => write!(f, "invalid retention configuration; {}", m),