diff --git a/README.md b/README.md index 8a70c7a..f2491d8 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,62 @@ -# MinIO Rust SDK for Amazon S3 Compatible Cloud Storage [![Slack](https://slack.min.io/slack?type=svg)](https://slack.min.io) +# MinIO Rust SDK for Amazon S3 Compatible Cloud Storage [![Slack](https://slack.min.io/slack?type=svg)](https://slack.min.io) [![Sourcegraph](https://sourcegraph.com/github.com/minio/minio-rs/-/badge.svg)](https://sourcegraph.com/github.com/minio/minio-rs?badge) [![Apache V2 License](https://img.shields.io/badge/license-Apache%20V2-blue.svg)](https://github.com/minio/minio-rs/blob/master/LICENSE) + +MinIO Rust SDK is Simple Storage Service (aka S3) client to perform bucket and object operations to any Amazon S3 compatible object storage service. + +For a complete list of APIs and examples, please take a look at the [MinIO Rust Client API Reference](https://minio-rs.min.io/) + +## Example:: file-uploader.rs +```rust +use minio::s3::args::{BucketExistsArgs, MakeBucketArgs, UploadObjectArgs}; +use minio::s3::client::Client; +use minio::s3::creds::StaticProvider; +use minio::s3::http::BaseUrl; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut base_url = BaseUrl::from_string("play.min.io").unwrap(); + base_url.https = true; + + let static_provider = StaticProvider::new( + "Q3AM3UQ867SPQQA43P2F", + "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG", + None, + ); + + let mut client = Client::new(base_url.clone(), Some(static_provider)); + + let bucket_name = "asiatrip"; + + // Check 'asiatrip' bucket exist or not. + let exists = client + .bucket_exists(&BucketExistsArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + + // Make 'asiatrip' bucket if not exist. + if !exist { + client + .make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + } + + // Upload '/home/user/Photos/asiaphotos.zip' as object name + // 'asiaphotos-2015.zip' to bucket 'asiatrip'. + client + .upload_object( + &mut UploadObjectArgs::new( + &bucket_name, + "asiaphotos-2015.zip", + "/home/user/Photos/asiaphotos.zip", + ) + .unwrap(), + ) + .await + .unwrap(); + + println!("'/home/user/Photos/asiaphotos.zip' is successfully uploaded as object 'asiaphotos-2015.zip' to bucket 'asiatrip'."); +} +``` ## License -MinIO Rust SDK is distributed under the terms of the Apache 2.0 license. - -See [LICENSE](LICENSE) for details. +This SDK is distributed under the [Apache License, Version 2.0](https://www.apache.org/licenses/LICENSE-2.0), see [LICENSE](https://github.com/minio/minio-rs/blob/master/LICENSE) for more information. diff --git a/src/s3/args.rs b/src/s3/args.rs index 267a831..2546e82 100644 --- a/src/s3/args.rs +++ b/src/s3/args.rs @@ -14,27 +14,34 @@ // limitations under the License. use crate::s3::error::Error; +use crate::s3::signer::post_presign_v4; use crate::s3::sse::{Sse, SseCustomerKey}; use crate::s3::types::{ - DeleteObject, Directive, Item, LifecycleConfig, NotificationRecords, Part, Retention, - SelectRequest, SseConfig, + DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, NotificationRecords, + ObjectLockConfig, Part, ReplicationConfig, Retention, RetentionMode, SelectRequest, SseConfig, }; use crate::s3::utils::{ - check_bucket_name, merge, to_http_header_value, to_iso8601utc, urlencode, Multimap, UtcTime, + b64encode, check_bucket_name, merge, to_amz_date, to_http_header_value, to_iso8601utc, + to_signer_date, urlencode, utc_now, Multimap, UtcTime, }; use derivative::Derivative; +use hyper::http::Method; +use serde_json::json; +use serde_json::Value; +use std::collections::HashMap; pub const MIN_PART_SIZE: usize = 5_242_880; // 5 MiB pub const MAX_PART_SIZE: usize = 5_368_709_120; // 5 GiB pub const MAX_OBJECT_SIZE: usize = 5_497_558_138_880; // 5 TiB pub const MAX_MULTIPART_COUNT: u16 = 10_000; +pub const DEFAULT_EXPIRY_SECONDS: u32 = 604_800; // 7 days fn object_write_args_headers( extra_headers: Option<&Multimap>, headers: Option<&Multimap>, user_metadata: Option<&Multimap>, sse: Option<&dyn Sse>, - tags: Option<&std::collections::HashMap>, + tags: Option<&HashMap>, retention: Option<&Retention>, legal_hold: bool, ) -> Multimap { @@ -397,7 +404,7 @@ pub struct PutObjectApiArgs<'a> { pub headers: Option<&'a Multimap>, pub user_metadata: Option<&'a Multimap>, pub sse: Option<&'a dyn Sse>, - pub tags: Option<&'a std::collections::HashMap>, + pub tags: Option<&'a HashMap>, pub retention: Option<&'a Retention>, pub legal_hold: bool, pub data: &'a [u8], @@ -458,7 +465,7 @@ pub struct UploadPartArgs<'a> { pub headers: Option<&'a Multimap>, pub user_metadata: Option<&'a Multimap>, pub sse: Option<&'a dyn Sse>, - pub tags: Option<&'a std::collections::HashMap>, + pub tags: Option<&'a HashMap>, pub retention: Option<&'a Retention>, pub legal_hold: bool, pub upload_id: &'a str, @@ -534,7 +541,7 @@ pub struct PutObjectArgs<'a> { pub headers: Option<&'a Multimap>, pub user_metadata: Option<&'a Multimap>, pub sse: Option<&'a dyn Sse>, - pub tags: Option<&'a std::collections::HashMap>, + pub tags: Option<&'a HashMap>, pub retention: Option<&'a Retention>, pub legal_hold: bool, pub object_size: Option, @@ -1090,7 +1097,7 @@ pub struct CopyObjectArgs<'a> { pub headers: Option<&'a Multimap>, pub user_metadata: Option<&'a Multimap>, pub sse: Option<&'a dyn Sse>, - pub tags: Option<&'a std::collections::HashMap>, + pub tags: Option<&'a HashMap>, pub retention: Option<&'a Retention>, pub legal_hold: bool, pub source: CopySource<'a>, @@ -1297,7 +1304,7 @@ pub struct ComposeObjectArgs<'a> { pub headers: Option<&'a Multimap>, pub user_metadata: Option<&'a Multimap>, pub sse: Option<&'a dyn Sse>, - pub tags: Option<&'a std::collections::HashMap>, + pub tags: Option<&'a HashMap>, pub retention: Option<&'a Retention>, pub legal_hold: bool, pub sources: &'a mut Vec>, @@ -1393,3 +1400,589 @@ pub struct SetBucketLifecycleArgs<'a> { pub bucket: &'a str, pub config: &'a LifecycleConfig, } + +pub type DeleteBucketNotificationArgs<'a> = BucketArgs<'a>; + +pub type GetBucketNotificationArgs<'a> = BucketArgs<'a>; + +pub struct SetBucketNotificationArgs<'a> { + pub extra_headers: Option<&'a Multimap>, + pub extra_query_params: Option<&'a Multimap>, + pub region: Option<&'a str>, + pub bucket: &'a str, + pub config: &'a NotificationConfig, +} + +impl<'a> SetBucketNotificationArgs<'a> { + pub fn new( + bucket_name: &'a str, + config: &'a NotificationConfig, + ) -> Result, Error> { + check_bucket_name(bucket_name, true)?; + + Ok(SetBucketNotificationArgs { + extra_headers: None, + extra_query_params: None, + region: None, + bucket: bucket_name, + config: config, + }) + } +} + +pub type DeleteBucketPolicyArgs<'a> = BucketArgs<'a>; + +pub type GetBucketPolicyArgs<'a> = BucketArgs<'a>; + +pub struct SetBucketPolicyArgs<'a> { + pub extra_headers: Option<&'a Multimap>, + pub extra_query_params: Option<&'a Multimap>, + pub region: Option<&'a str>, + pub bucket: &'a str, + pub config: &'a str, +} + +impl<'a> SetBucketPolicyArgs<'a> { + pub fn new(bucket_name: &'a str, config: &'a str) -> Result, Error> { + check_bucket_name(bucket_name, true)?; + + Ok(SetBucketPolicyArgs { + extra_headers: None, + extra_query_params: None, + region: None, + bucket: bucket_name, + config: config, + }) + } +} + +pub type DeleteBucketReplicationArgs<'a> = BucketArgs<'a>; + +pub type GetBucketReplicationArgs<'a> = BucketArgs<'a>; + +pub struct SetBucketReplicationArgs<'a> { + pub extra_headers: Option<&'a Multimap>, + pub extra_query_params: Option<&'a Multimap>, + pub region: Option<&'a str>, + pub bucket: &'a str, + pub config: &'a ReplicationConfig, +} + +pub type DeleteBucketTagsArgs<'a> = BucketArgs<'a>; + +pub type GetBucketTagsArgs<'a> = BucketArgs<'a>; + +pub struct SetBucketTagsArgs<'a> { + pub extra_headers: Option<&'a Multimap>, + pub extra_query_params: Option<&'a Multimap>, + pub region: Option<&'a str>, + pub bucket: &'a str, + pub tags: &'a HashMap, +} + +impl<'a> SetBucketTagsArgs<'a> { + pub fn new( + bucket_name: &'a str, + tags: &'a HashMap, + ) -> Result, Error> { + check_bucket_name(bucket_name, true)?; + + Ok(SetBucketTagsArgs { + extra_headers: None, + extra_query_params: None, + region: None, + bucket: bucket_name, + tags: tags, + }) + } +} + +pub type GetBucketVersioningArgs<'a> = BucketArgs<'a>; + +pub struct SetBucketVersioningArgs<'a> { + pub extra_headers: Option<&'a Multimap>, + pub extra_query_params: Option<&'a Multimap>, + pub region: Option<&'a str>, + pub bucket: &'a str, + pub status: bool, + pub mfa_delete: Option, +} + +impl<'a> SetBucketVersioningArgs<'a> { + pub fn new(bucket_name: &'a str, status: bool) -> Result, Error> { + check_bucket_name(bucket_name, true)?; + + Ok(SetBucketVersioningArgs { + extra_headers: None, + extra_query_params: None, + region: None, + bucket: bucket_name, + status: status, + mfa_delete: None, + }) + } +} + +pub type DeleteObjectLockConfigArgs<'a> = BucketArgs<'a>; + +pub type GetObjectLockConfigArgs<'a> = BucketArgs<'a>; + +pub struct SetObjectLockConfigArgs<'a> { + pub extra_headers: Option<&'a Multimap>, + pub extra_query_params: Option<&'a Multimap>, + pub region: Option<&'a str>, + pub bucket: &'a str, + pub config: &'a ObjectLockConfig, +} + +impl<'a> SetObjectLockConfigArgs<'a> { + pub fn new( + bucket_name: &'a str, + config: &'a ObjectLockConfig, + ) -> Result, Error> { + check_bucket_name(bucket_name, true)?; + + Ok(SetObjectLockConfigArgs { + extra_headers: None, + extra_query_params: None, + region: None, + bucket: bucket_name, + config: config, + }) + } +} + +pub type GetObjectRetentionArgs<'a> = ObjectVersionArgs<'a>; + +pub struct SetObjectRetentionArgs<'a> { + pub extra_headers: Option<&'a Multimap>, + pub extra_query_params: Option<&'a Multimap>, + pub region: Option<&'a str>, + pub bucket: &'a str, + pub object: &'a str, + pub version_id: Option<&'a str>, + pub bypass_governance_mode: bool, + pub retention_mode: Option, + pub retain_until_date: Option, +} + +impl<'a> SetObjectRetentionArgs<'a> { + pub fn new( + bucket_name: &'a str, + object_name: &'a str, + ) -> Result, Error> { + check_bucket_name(bucket_name, true)?; + + if object_name.is_empty() { + return Err(Error::InvalidObjectName(String::from( + "object name cannot be empty", + ))); + } + + Ok(SetObjectRetentionArgs { + extra_headers: None, + extra_query_params: None, + region: None, + bucket: bucket_name, + object: object_name, + version_id: None, + bypass_governance_mode: false, + retention_mode: None, + retain_until_date: None, + }) + } +} + +pub type DeleteObjectTagsArgs<'a> = ObjectVersionArgs<'a>; + +pub type GetObjectTagsArgs<'a> = ObjectVersionArgs<'a>; + +pub struct SetObjectTagsArgs<'a> { + pub extra_headers: Option<&'a Multimap>, + pub extra_query_params: Option<&'a Multimap>, + pub region: Option<&'a str>, + pub bucket: &'a str, + pub object: &'a str, + pub version_id: Option<&'a str>, + pub tags: &'a HashMap, +} + +impl<'a> SetObjectTagsArgs<'a> { + pub fn new( + bucket_name: &'a str, + object_name: &'a str, + tags: &'a HashMap, + ) -> Result, Error> { + check_bucket_name(bucket_name, true)?; + + if object_name.is_empty() { + return Err(Error::InvalidObjectName(String::from( + "object name cannot be empty", + ))); + } + + Ok(SetObjectTagsArgs { + extra_headers: None, + extra_query_params: None, + region: None, + bucket: bucket_name, + object: object_name, + version_id: None, + tags: tags, + }) + } +} + +pub struct GetPresignedObjectUrlArgs<'a> { + pub extra_query_params: Option<&'a Multimap>, + pub region: Option<&'a str>, + pub bucket: &'a str, + pub object: &'a str, + pub version_id: Option<&'a str>, + pub method: Method, + pub expiry_seconds: Option, + pub request_time: Option, +} + +impl<'a> GetPresignedObjectUrlArgs<'a> { + pub fn new( + bucket_name: &'a str, + object_name: &'a str, + method: Method, + ) -> Result, Error> { + check_bucket_name(bucket_name, true)?; + + if object_name.is_empty() { + return Err(Error::InvalidObjectName(String::from( + "object name cannot be empty", + ))); + } + + Ok(GetPresignedObjectUrlArgs { + extra_query_params: None, + region: None, + bucket: bucket_name, + object: object_name, + version_id: None, + method: method, + expiry_seconds: Some(DEFAULT_EXPIRY_SECONDS), + request_time: None, + }) + } +} + +pub struct PostPolicy<'a> { + pub region: Option<&'a str>, + pub bucket: &'a str, + + expiration: &'a UtcTime, + eq_conditions: HashMap, + starts_with_conditions: HashMap, + lower_limit: Option, + upper_limit: Option, +} + +impl<'a> PostPolicy<'a> { + const EQ: &str = "eq"; + const STARTS_WITH: &str = "starts-with"; + const ALGORITHM: &str = "AWS4-HMAC-SHA256"; + + pub fn new(bucket_name: &'a str, expiration: &'a UtcTime) -> Result, Error> { + check_bucket_name(bucket_name, true)?; + + Ok(PostPolicy { + region: None, + bucket: bucket_name, + expiration: expiration, + eq_conditions: HashMap::new(), + starts_with_conditions: HashMap::new(), + lower_limit: None, + upper_limit: None, + }) + } + + fn trim_dollar(value: &str) -> String { + let mut s = value.to_string(); + if s.starts_with("$") { + s.remove(0); + } + return s; + } + + fn is_reserved_element(element: &str) -> bool { + return element == "bucket" + || element == "x-amz-algorithm" + || element == "x-amz-credential" + || element == "x-amz-date" + || element == "policy" + || element == "x-amz-signature"; + } + + fn get_credential_string(access_key: &String, date: &UtcTime, region: &String) -> String { + return format!( + "{}/{}/{}/s3/aws4_request", + access_key, + to_signer_date(*date), + region + ); + } + + pub fn add_equals_condition(&mut self, element: &str, value: &str) -> Result<(), Error> { + if element.is_empty() { + return Err(Error::PostPolicyError(format!( + "condition element cannot be empty" + ))); + } + + let v = PostPolicy::trim_dollar(element); + if v == "success_action_redirect" || v == "redirect" || v == "content-length-range" { + return Err(Error::PostPolicyError(format!( + "{} is unsupported for equals condition", + element + ))); + } + + if PostPolicy::is_reserved_element(&v.as_str()) { + return Err(Error::PostPolicyError(format!("{} cannot set", element))); + } + + self.eq_conditions.insert(v, value.to_string()); + Ok(()) + } + + pub fn remove_equals_condition(&mut self, element: &str) { + self.eq_conditions.remove(element); + } + + pub fn add_starts_with_condition(&mut self, element: &str, value: &str) -> Result<(), Error> { + if element.is_empty() { + return Err(Error::PostPolicyError(format!( + "condition element cannot be empty" + ))); + } + + let v = PostPolicy::trim_dollar(element); + if v == "success_action_status" + || v == "content-length-range" + || (v.starts_with("x-amz-") && v.starts_with("x-amz-meta-")) + { + return Err(Error::PostPolicyError(format!( + "{} is unsupported for starts-with condition", + element + ))); + } + + if PostPolicy::is_reserved_element(&v.as_str()) { + return Err(Error::PostPolicyError(format!("{} cannot set", element))); + } + + self.starts_with_conditions + .insert(v.clone(), value.to_string()); + Ok(()) + } + + pub fn remove_starts_with_condition(&mut self, element: &str) { + self.starts_with_conditions.remove(element); + } + + pub fn add_content_length_range_condition( + &mut self, + lower_limit: usize, + upper_limit: usize, + ) -> Result<(), Error> { + if lower_limit > upper_limit { + return Err(Error::PostPolicyError(format!( + "lower limit cannot be greater than upper limit" + ))); + } + + self.lower_limit = Some(lower_limit); + self.upper_limit = Some(upper_limit); + Ok(()) + } + + pub fn remove_content_length_range_condition(&mut self) { + self.lower_limit = None; + self.upper_limit = None; + } + + pub fn form_data( + &self, + access_key: String, + secret_key: String, + session_token: Option, + region: String, + ) -> Result, Error> { + if region.is_empty() { + return Err(Error::PostPolicyError(format!("region cannot be empty"))); + } + + if !self.eq_conditions.contains_key("key") + && !self.starts_with_conditions.contains_key("key") + { + return Err(Error::PostPolicyError(format!("key condition must be set"))); + } + + let mut conditions: Vec = Vec::new(); + conditions.push(json!([PostPolicy::EQ, "$bucket", self.bucket])); + for (key, value) in &self.eq_conditions { + conditions.push(json!([PostPolicy::EQ, String::from("$") + &key, value])); + } + for (key, value) in &self.starts_with_conditions { + conditions.push(json!([ + PostPolicy::STARTS_WITH, + String::from("$") + &key, + value + ])); + } + if self.lower_limit.is_some() && self.upper_limit.is_some() { + conditions.push(json!([ + "content-length-range", + self.lower_limit.unwrap(), + self.upper_limit.unwrap() + ])); + } + + let date = utc_now(); + let credential = PostPolicy::get_credential_string(&access_key, &date, ®ion); + let amz_date = to_amz_date(date); + conditions.push(json!([ + PostPolicy::EQ, + "$x-amz-algorithm", + PostPolicy::ALGORITHM + ])); + conditions.push(json!([PostPolicy::EQ, "$x-amz-credential", credential])); + if let Some(v) = &session_token { + conditions.push(json!([PostPolicy::EQ, "$x-amz-security-token", v])); + } + conditions.push(json!([PostPolicy::EQ, "$x-amz-date", amz_date])); + + let policy = json!({ + "expiration": to_iso8601utc(*self.expiration), + "conditions": conditions, + }); + + let encoded_policy = b64encode(policy.to_string()); + let signature = post_presign_v4(&encoded_policy, &secret_key, date, ®ion); + + let mut data: HashMap = HashMap::new(); + data.insert( + String::from("x-amz-algorithm"), + String::from(PostPolicy::ALGORITHM), + ); + data.insert(String::from("x-amz-credential"), credential); + data.insert(String::from("x-amz-date"), amz_date); + data.insert(String::from("policy"), encoded_policy); + data.insert(String::from("x-amz-signature"), signature); + if let Some(v) = session_token { + data.insert(String::from("x-amz-security-token"), v); + } + + Ok(data) + } +} + +pub struct DownloadObjectArgs<'a> { + pub extra_headers: Option<&'a Multimap>, + pub extra_query_params: Option<&'a Multimap>, + pub region: Option<&'a str>, + pub bucket: &'a str, + pub object: &'a str, + pub version_id: Option<&'a str>, + pub ssec: Option<&'a SseCustomerKey>, + pub filename: &'a str, + pub overwrite: bool, +} + +impl<'a> DownloadObjectArgs<'a> { + pub fn new( + bucket_name: &'a str, + object_name: &'a str, + filename: &'a str, + ) -> Result, Error> { + check_bucket_name(bucket_name, true)?; + + if object_name.is_empty() { + return Err(Error::InvalidObjectName(String::from( + "object name cannot be empty", + ))); + } + + Ok(DownloadObjectArgs { + extra_headers: None, + extra_query_params: None, + region: None, + bucket: bucket_name, + object: object_name, + version_id: None, + ssec: None, + filename: filename, + overwrite: false, + }) + } +} + +pub struct UploadObjectArgs<'a> { + pub extra_headers: Option<&'a Multimap>, + pub extra_query_params: Option<&'a Multimap>, + pub region: Option<&'a str>, + pub bucket: &'a str, + pub object: &'a str, + pub headers: Option<&'a Multimap>, + pub user_metadata: Option<&'a Multimap>, + pub sse: Option<&'a dyn Sse>, + pub tags: Option<&'a HashMap>, + pub retention: Option<&'a Retention>, + pub legal_hold: bool, + pub object_size: Option, + pub part_size: usize, + pub part_count: i16, + pub content_type: &'a str, + pub filename: &'a str, +} + +impl<'a> UploadObjectArgs<'a> { + pub fn new( + bucket_name: &'a str, + object_name: &'a str, + filename: &'a str, + ) -> Result, Error> { + check_bucket_name(bucket_name, true)?; + + if object_name.is_empty() { + return Err(Error::InvalidObjectName(String::from( + "object name cannot be empty", + ))); + } + + let meta = std::fs::metadata(filename)?; + if !meta.is_file() { + return Err(Error::IOError(std::io::Error::new( + std::io::ErrorKind::Other, + "not a file", + ))); + } + + let object_size = Some(meta.len() as usize); + let (psize, part_count) = calc_part_info(object_size, None)?; + + Ok(UploadObjectArgs { + extra_headers: None, + extra_query_params: None, + region: None, + bucket: bucket_name, + object: object_name, + headers: None, + user_metadata: None, + sse: None, + tags: None, + retention: None, + legal_hold: false, + object_size: object_size, + part_size: psize, + part_count: part_count, + content_type: "application/octet-stream", + filename: filename, + }) + } +} diff --git a/src/s3/client.rs b/src/s3/client.rs index 7d27ffe..c8ec833 100644 --- a/src/s3/client.rs +++ b/src/s3/client.rs @@ -18,14 +18,15 @@ use crate::s3::creds::Provider; use crate::s3::error::{Error, ErrorResponse}; use crate::s3::http::{BaseUrl, Url}; use crate::s3::response::*; -use crate::s3::signer::sign_v4_s3; +use crate::s3::signer::{presign_v4, sign_v4_s3}; use crate::s3::sse::SseCustomerKey; use crate::s3::types::{ - Bucket, DeleteObject, Directive, Item, LifecycleConfig, NotificationRecords, Part, SseConfig, + Bucket, DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, + NotificationRecords, ObjectLockConfig, Part, ReplicationConfig, RetentionMode, SseConfig, }; use crate::s3::utils::{ from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, merge, sha256_hash, - to_amz_date, urldecode, utc_now, Multimap, + to_amz_date, to_iso8601utc, urldecode, utc_now, Multimap, }; use async_recursion::async_recursion; use bytes::{Buf, Bytes}; @@ -34,6 +35,7 @@ use hyper::http::Method; use reqwest::header::HeaderMap; use std::collections::{HashMap, VecDeque}; use std::fs::File; +use std::io::prelude::*; use std::io::Read; use xmltree::Element; @@ -90,18 +92,18 @@ fn parse_common_list_objects_response( ), Error, > { - let encoding_type = get_option_text(&root, "EncodingType")?; + let encoding_type = get_option_text(&root, "EncodingType"); let prefix = url_decode(&encoding_type, Some(get_default_text(&root, "Prefix")))?; Ok(( get_text(&root, "Name")?, encoding_type, prefix, - get_option_text(&root, "Delimiter")?, - match get_option_text(&root, "IsTruncated")? { + get_option_text(&root, "Delimiter"), + match get_option_text(&root, "IsTruncated") { Some(v) => v.to_lowercase() == "true", None => false, }, - match get_option_text(&root, "MaxKeys")? { + match get_option_text(&root, "MaxKeys") { Some(v) => Some(v.parse::()?), None => None, }, @@ -124,19 +126,19 @@ fn parse_list_objects_contents( let etype = encoding_type.as_ref().map(|v| v.clone()); let key = url_decode(&etype, Some(get_text(&content, "Key")?))?.unwrap(); let last_modified = Some(from_iso8601utc(&get_text(&content, "LastModified")?)?); - let etag = get_option_text(&content, "ETag")?; + let etag = get_option_text(&content, "ETag"); let v = get_default_text(&content, "Size"); let size = match v.is_empty() { true => None, false => Some(v.parse::()?), }; - let storage_class = get_option_text(&content, "StorageClass")?; + let storage_class = get_option_text(&content, "StorageClass"); let is_latest = get_default_text(&content, "IsLatest").to_lowercase() == "true"; - let version_id = get_option_text(&content, "VersionId")?; + let version_id = get_option_text(&content, "VersionId"); let (owner_id, owner_name) = match content.get_child("Owner") { Some(v) => ( - get_option_text(&v, "ID")?, - get_option_text(&v, "DisplayName")?, + get_option_text(&v, "ID"), + get_option_text(&v, "DisplayName"), ), None => (None, None), }; @@ -279,29 +281,26 @@ impl<'a> Client<'a> { let date = utc_now(); headers.insert(String::from("x-amz-date"), to_amz_date(date)); - match self.provider { - Some(p) => { - let creds = p.fetch(); - if creds.session_token.is_some() { - headers.insert( - String::from("X-Amz-Security-Token"), - creds.session_token.unwrap(), - ); - } - sign_v4_s3( - &method, - &url.path, - region, - headers, - query_params, - &creds.access_key, - &creds.secret_key, - &sha256, - date, + if let Some(p) = self.provider { + let creds = p.fetch(); + if creds.session_token.is_some() { + headers.insert( + String::from("X-Amz-Security-Token"), + creds.session_token.unwrap(), ); } - _ => todo!(), // Nothing to do for anonymous request - }; + sign_v4_s3( + &method, + &url.path, + region, + headers, + query_params, + &creds.access_key, + &creds.secret_key, + &sha256, + date, + ); + } } fn handle_redirect_response( @@ -516,7 +515,7 @@ impl<'a> Client<'a> { } } } - _ => todo!(), // Nothing to do. + _ => return Err(e), }; return Err(e); @@ -1230,7 +1229,7 @@ impl<'a> Client<'a> { pub async fn disable_object_legal_hold( &self, - args: DisableObjectLegalHoldArgs<'_>, + args: &DisableObjectLegalHoldArgs<'_>, ) -> Result { let region = self.get_region(&args.bucket, args.region).await?; @@ -1271,7 +1270,7 @@ impl<'a> Client<'a> { pub async fn delete_bucket_lifecycle( &self, - args: DeleteBucketLifecycleArgs<'_>, + args: &DeleteBucketLifecycleArgs<'_>, ) -> Result { let region = self.get_region(&args.bucket, args.region).await?; @@ -1304,18 +1303,267 @@ impl<'a> Client<'a> { bucket_name: args.bucket.to_string(), }) } - // DeleteBucketNotificationResponse DeleteBucketNotification( - // DeleteBucketNotificationArgs args); - // DeleteBucketPolicyResponse DeleteBucketPolicy(DeleteBucketPolicyArgs args); - // DeleteBucketReplicationResponse DeleteBucketReplication( - // DeleteBucketReplicationArgs args); - // DeleteBucketTagsResponse DeleteBucketTags(DeleteBucketTagsArgs args); - // DeleteObjectLockConfigResponse DeleteObjectLockConfig( - // DeleteObjectLockConfigArgs args); - // DeleteObjectTagsResponse DeleteObjectTags(DeleteObjectTagsArgs args); + + pub async fn delete_bucket_notification( + &self, + args: &DeleteBucketNotificationArgs<'_>, + ) -> Result { + self.set_bucket_notification(&SetBucketNotificationArgs { + extra_headers: args.extra_headers, + extra_query_params: args.extra_query_params, + region: args.region, + bucket: args.bucket, + config: &NotificationConfig { + cloud_func_config_list: None, + queue_config_list: None, + topic_config_list: None, + }, + }) + .await + } + + pub async fn delete_bucket_policy( + &self, + args: &DeleteBucketPolicyArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("policy"), String::new()); + + match self + .execute( + Method::DELETE, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + None, + ) + .await + { + Ok(resp) => Ok(DeleteBucketPolicyResponse { + headers: resp.headers().clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + }), + Err(e) => match e { + Error::S3Error(ref err) => { + if err.code == "NoSuchBucketPolicy" { + return Ok(DeleteBucketPolicyResponse { + headers: HeaderMap::new(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + }); + } + return Err(e); + } + _ => return Err(e), + }, + } + } + + pub async fn delete_bucket_replication( + &self, + args: &DeleteBucketReplicationArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("replication"), String::new()); + + match self + .execute( + Method::DELETE, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + None, + ) + .await + { + Ok(resp) => Ok(DeleteBucketReplicationResponse { + headers: resp.headers().clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + }), + Err(e) => match e { + Error::S3Error(ref err) => { + if err.code == "ReplicationConfigurationNotFoundError" { + return Ok(DeleteBucketReplicationResponse { + headers: HeaderMap::new(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + }); + } + return Err(e); + } + _ => return Err(e), + }, + } + } + + pub async fn delete_bucket_tags( + &self, + args: &DeleteBucketTagsArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("tagging"), String::new()); + + let resp = self + .execute( + Method::DELETE, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + None, + ) + .await?; + + Ok(DeleteBucketTagsResponse { + headers: resp.headers().clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + }) + } + + pub async fn delete_object_lock_config( + &self, + args: &DeleteObjectLockConfigArgs<'_>, + ) -> Result { + self.set_object_lock_config(&SetObjectLockConfigArgs { + extra_headers: args.extra_headers, + extra_query_params: args.extra_query_params, + region: args.region, + bucket: args.bucket, + config: &ObjectLockConfig { + retention_mode: None, + retention_duration_days: None, + retention_duration_years: None, + }, + }) + .await + } + + pub async fn delete_object_tags( + &self, + args: &DeleteObjectTagsArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + if let Some(v) = args.version_id { + query_params.insert(String::from("versionId"), v.to_string()); + } + query_params.insert(String::from("tagging"), String::new()); + + let resp = self + .execute( + Method::DELETE, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + Some(&args.object), + None, + ) + .await?; + + Ok(DeleteObjectTagsResponse { + headers: resp.headers().clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + object_name: args.object.to_string(), + version_id: args.version_id.as_ref().map(|v| v.to_string()), + }) + } + + pub async fn download_object( + &self, + args: &DownloadObjectArgs<'_>, + ) -> Result { + let mut resp = self + .get_object(&GetObjectArgs { + extra_headers: args.extra_headers, + extra_query_params: args.extra_query_params, + region: args.region, + bucket: args.bucket, + object: args.object, + version_id: args.version_id, + ssec: args.ssec, + offset: None, + length: None, + match_etag: None, + not_match_etag: None, + modified_since: None, + unmodified_since: None, + }) + .await?; + + let mut file = match args.overwrite { + true => File::create(args.filename)?, + false => File::options() + .write(true) + .truncate(true) + .create_new(true) + .open(args.filename)?, + }; + while let Some(v) = resp.chunk().await? { + file.write_all(&v)?; + } + file.sync_all()?; + + Ok(DownloadObjectResponse { + headers: resp.headers().clone(), + region: args.region.map_or(String::new(), |v| String::from(v)), + bucket_name: args.bucket.to_string(), + object_name: args.object.to_string(), + version_id: args.version_id.as_ref().map(|v| v.to_string()), + }) + } + pub async fn enable_object_legal_hold( &self, - args: EnableObjectLegalHoldArgs<'_>, + args: &EnableObjectLegalHoldArgs<'_>, ) -> Result { let region = self.get_region(&args.bucket, args.region).await?; @@ -1401,14 +1649,14 @@ impl<'a> Client<'a> { bucket_name: args.bucket.to_string(), config: SseConfig { sse_algorithm: get_text(sse_by_default, "SSEAlgorithm")?, - kms_master_key_id: get_option_text(sse_by_default, "KMSMasterKeyID")?, + kms_master_key_id: get_option_text(sse_by_default, "KMSMasterKeyID"), }, }) } pub async fn get_bucket_lifecycle( &self, - args: GetBucketLifecycleArgs<'_>, + args: &GetBucketLifecycleArgs<'_>, ) -> Result { let region = self.get_region(&args.bucket, args.region).await?; @@ -1463,13 +1711,253 @@ impl<'a> Client<'a> { }, } } - // GetBucketNotificationResponse GetBucketNotification( - // GetBucketNotificationArgs args); - // GetBucketPolicyResponse GetBucketPolicy(GetBucketPolicyArgs args); - // GetBucketReplicationResponse GetBucketReplication( - // GetBucketReplicationArgs args); - // GetBucketTagsResponse GetBucketTags(GetBucketTagsArgs args); - // GetBucketVersioningResponse GetBucketVersioning(GetBucketVersioningArgs args); + + pub async fn get_bucket_notification( + &self, + args: &GetBucketNotificationArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("notification"), String::new()); + + let resp = self + .execute( + Method::GET, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + None, + ) + .await?; + + let header_map = resp.headers().clone(); + let body = resp.bytes().await?; + let mut root = Element::parse(body.reader())?; + + return Ok(GetBucketNotificationResponse { + headers: header_map.clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + config: NotificationConfig::from_xml(&mut root)?, + }); + } + + pub async fn get_bucket_policy( + &self, + args: &GetBucketPolicyArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("policy"), String::new()); + + match self + .execute( + Method::GET, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + None, + ) + .await + { + Ok(resp) => { + return Ok(GetBucketPolicyResponse { + headers: resp.headers().clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + config: resp.text().await?, + }) + } + Err(e) => match e { + Error::S3Error(ref err) => { + if err.code == "NoSuchBucketPolicy" { + return Ok(GetBucketPolicyResponse { + headers: HeaderMap::new(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + config: String::from("{}"), + }); + } + return Err(e); + } + _ => return Err(e), + }, + } + } + + pub async fn get_bucket_replication( + &self, + args: &GetBucketReplicationArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("replication"), String::new()); + + let resp = self + .execute( + Method::GET, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + None, + ) + .await?; + + let header_map = resp.headers().clone(); + let body = resp.bytes().await?; + let root = Element::parse(body.reader())?; + + return Ok(GetBucketReplicationResponse { + headers: header_map.clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + config: ReplicationConfig::from_xml(&root)?, + }); + } + + pub async fn get_bucket_tags( + &self, + args: &GetBucketTagsArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("tagging"), String::new()); + + match self + .execute( + Method::GET, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + None, + ) + .await + { + Ok(resp) => { + let header_map = resp.headers().clone(); + let body = resp.bytes().await?; + let mut root = Element::parse(body.reader())?; + + let element = root + .get_mut_child("TagSet") + .ok_or(Error::XmlError(format!(" tag not found")))?; + let mut tags = std::collections::HashMap::new(); + loop { + match element.take_child("Tag") { + Some(v) => tags.insert(get_text(&v, "Key")?, get_text(&v, "Value")?), + _ => break, + }; + } + + return Ok(GetBucketTagsResponse { + headers: header_map.clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + tags: tags, + }); + } + Err(e) => match e { + Error::S3Error(ref err) => { + if err.code == "NoSuchTagSet" { + return Ok(GetBucketTagsResponse { + headers: HeaderMap::new(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + tags: HashMap::new(), + }); + } + return Err(e); + } + _ => return Err(e), + }, + } + } + + pub async fn get_bucket_versioning( + &self, + args: &GetBucketVersioningArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("versioning"), String::new()); + + let resp = self + .execute( + Method::GET, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + None, + ) + .await?; + + let header_map = resp.headers().clone(); + let body = resp.bytes().await?; + let root = Element::parse(body.reader())?; + + return Ok(GetBucketVersioningResponse { + headers: header_map.clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + status: get_option_text(&root, "Status").map(|v| v == "Enabled"), + mfa_delete: get_option_text(&root, "MFADelete").map(|v| v == "Enabled"), + }); + } pub async fn get_object(&self, args: &GetObjectArgs<'_>) -> Result { if args.ssec.is_some() && !self.base_url.https { @@ -1504,15 +1992,257 @@ impl<'a> Client<'a> { .await } - // GetObjectLockConfigResponse GetObjectLockConfig(GetObjectLockConfigArgs args); - // GetObjectRetentionResponse GetObjectRetention(GetObjectRetentionArgs args); - // GetObjectTagsResponse GetObjectTags(GetObjectTagsArgs args); - // GetPresignedObjectUrlResponse GetPresignedObjectUrl( - // GetPresignedObjectUrlArgs args); - // GetPresignedPostFormDataResponse GetPresignedPostFormData(PostPolicy policy); + pub async fn get_object_lock_config( + &self, + args: &GetObjectLockConfigArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("object-lock"), String::new()); + + let resp = self + .execute( + Method::GET, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + None, + ) + .await?; + + let header_map = resp.headers().clone(); + let body = resp.bytes().await?; + let root = Element::parse(body.reader())?; + + return Ok(GetObjectLockConfigResponse { + headers: header_map.clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + config: ObjectLockConfig::from_xml(&root)?, + }); + } + + pub async fn get_object_retention( + &self, + args: &GetObjectRetentionArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + if let Some(v) = args.version_id { + query_params.insert(String::from("versionId"), v.to_string()); + } + query_params.insert(String::from("retention"), String::new()); + + match self + .execute( + Method::GET, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + Some(&args.object), + None, + ) + .await + { + Ok(resp) => { + let header_map = resp.headers().clone(); + let body = resp.bytes().await?; + let root = Element::parse(body.reader())?; + + return Ok(GetObjectRetentionResponse { + headers: header_map.clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + object_name: args.object.to_string(), + version_id: args.version_id.as_ref().map(|v| v.to_string()), + retention_mode: match get_option_text(&root, "Mode") { + Some(v) => Some(RetentionMode::parse(&v)?), + _ => None, + }, + retain_until_date: match get_option_text(&root, "RetainUntilDate") { + Some(v) => Some(from_iso8601utc(&v)?), + _ => None, + }, + }); + } + Err(e) => match e { + Error::S3Error(ref err) => { + if err.code == "NoSuchObjectLockConfiguration" { + return Ok(GetObjectRetentionResponse { + headers: HeaderMap::new(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + object_name: args.object.to_string(), + version_id: args.version_id.as_ref().map(|v| v.to_string()), + retention_mode: None, + retain_until_date: None, + }); + } + return Err(e); + } + _ => return Err(e), + }, + } + } + + pub async fn get_object_tags( + &self, + args: &GetObjectTagsArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + if let Some(v) = args.version_id { + query_params.insert(String::from("versionId"), v.to_string()); + } + query_params.insert(String::from("tagging"), String::new()); + + let resp = self + .execute( + Method::GET, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + Some(&args.object), + None, + ) + .await?; + + let header_map = resp.headers().clone(); + let body = resp.bytes().await?; + let mut root = Element::parse(body.reader())?; + + let element = root + .get_mut_child("TagSet") + .ok_or(Error::XmlError(format!(" tag not found")))?; + let mut tags = std::collections::HashMap::new(); + loop { + match element.take_child("Tag") { + Some(v) => tags.insert(get_text(&v, "Key")?, get_text(&v, "Value")?), + _ => break, + }; + } + + return Ok(GetObjectTagsResponse { + headers: header_map.clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + object_name: args.object.to_string(), + version_id: args.version_id.as_ref().map(|v| v.to_string()), + tags: tags, + }); + } + + pub async fn get_presigned_object_url( + &self, + args: &GetPresignedObjectUrlArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + if let Some(v) = args.version_id { + query_params.insert(String::from("versionId"), v.to_string()); + } + + let mut url = self.base_url.build_url( + &args.method, + ®ion, + &query_params, + Some(args.bucket), + Some(args.object), + )?; + + if let Some(p) = self.provider { + let creds = p.fetch(); + if let Some(t) = creds.session_token { + query_params.insert(String::from("X-Amz-Security-Token"), t); + } + + let date = match args.request_time { + Some(v) => v, + _ => utc_now(), + }; + + presign_v4( + &args.method, + &url.host, + &url.path, + ®ion, + &mut query_params, + &creds.access_key, + &creds.secret_key, + date, + args.expiry_seconds.unwrap_or(DEFAULT_EXPIRY_SECONDS), + ); + + url.query = query_params; + } + + return Ok(GetPresignedObjectUrlResponse { + region: region.clone(), + bucket_name: args.bucket.to_string(), + object_name: args.object.to_string(), + version_id: args.version_id.as_ref().map(|v| v.to_string()), + url: url.to_string(), + }); + } + + pub async fn get_presigned_post_form_data( + &self, + policy: &PostPolicy<'_>, + ) -> Result, Error> { + if self.provider.is_none() { + return Err(Error::PostPolicyError(format!( + "anonymous access does not require presigned post form-data" + ))); + } + + let region = self.get_region(&policy.bucket, policy.region).await?; + let creds = self.provider.unwrap().fetch(); + policy.form_data( + creds.access_key, + creds.secret_key, + creds.session_token, + region, + ) + } + pub async fn is_object_legal_hold_enabled( &self, - args: IsObjectLegalHoldEnabledArgs<'_>, + args: &IsObjectLegalHoldEnabledArgs<'_>, ) -> Result { let region = self.get_region(&args.bucket, args.region).await?; @@ -1755,8 +2485,8 @@ impl<'a> Client<'a> { let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) = parse_common_list_objects_response(&root)?; - let marker = url_decode(&encoding_type, get_option_text(&root, "Marker")?)?; - let mut next_marker = url_decode(&encoding_type, get_option_text(&root, "NextMarker")?)?; + let marker = url_decode(&encoding_type, get_option_text(&root, "Marker"))?; + let mut next_marker = url_decode(&encoding_type, get_option_text(&root, "NextMarker"))?; let mut contents: Vec = Vec::new(); parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?; if is_truncated && next_marker.is_none() { @@ -1834,7 +2564,7 @@ impl<'a> Client<'a> { let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) = parse_common_list_objects_response(&root)?; - let text = get_option_text(&root, "KeyCount")?; + let text = get_option_text(&root, "KeyCount"); let key_count = match text { Some(v) => match v.is_empty() { true => None, @@ -1842,9 +2572,9 @@ impl<'a> Client<'a> { }, None => None, }; - let start_after = url_decode(&encoding_type, get_option_text(&root, "StartAfter")?)?; - let continuation_token = get_option_text(&root, "ContinuationToken")?; - let next_continuation_token = get_option_text(&root, "NextContinuationToken")?; + let start_after = url_decode(&encoding_type, get_option_text(&root, "StartAfter"))?; + let continuation_token = get_option_text(&root, "ContinuationToken"); + let next_continuation_token = get_option_text(&root, "NextContinuationToken"); let mut contents: Vec = Vec::new(); parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?; parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?; @@ -1912,10 +2642,10 @@ impl<'a> Client<'a> { let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) = parse_common_list_objects_response(&root)?; - let key_marker = url_decode(&encoding_type, get_option_text(&root, "KeyMarker")?)?; - let next_key_marker = url_decode(&encoding_type, get_option_text(&root, "NextKeyMarker")?)?; - let version_id_marker = get_option_text(&root, "VersionIdMarker")?; - let next_version_id_marker = get_option_text(&root, "NextVersionIdMarker")?; + let key_marker = url_decode(&encoding_type, get_option_text(&root, "KeyMarker"))?; + let next_key_marker = url_decode(&encoding_type, get_option_text(&root, "NextKeyMarker"))?; + let version_id_marker = get_option_text(&root, "VersionIdMarker"); + let next_version_id_marker = get_option_text(&root, "NextVersionIdMarker"); let mut contents: Vec = Vec::new(); parse_list_objects_contents(&mut contents, &mut root, "Version", &encoding_type, false)?; parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?; @@ -2454,9 +3184,9 @@ impl<'a> Client<'a> { objects.push(DeletedObject { name: get_text(&deleted, "Key")?, - version_id: get_option_text(&deleted, "VersionId")?, + version_id: get_option_text(&deleted, "VersionId"), delete_marker: get_text(&deleted, "DeleteMarker")?.to_lowercase() == "true", - delete_marker_version_id: get_option_text(&deleted, "DeleteMarkerVersionId")?, + delete_marker_version_id: get_option_text(&deleted, "DeleteMarkerVersionId"), }) } @@ -2471,7 +3201,7 @@ impl<'a> Client<'a> { code: get_text(&error, "Code")?, message: get_text(&error, "Message")?, object_name: get_text(&error, "Key")?, - version_id: get_option_text(&error, "VersionId")?, + version_id: get_option_text(&error, "VersionId"), }) } @@ -2558,7 +3288,7 @@ impl<'a> Client<'a> { pub async fn set_bucket_lifecycle( &self, - args: SetBucketLifecycleArgs<'_>, + args: &SetBucketLifecycleArgs<'_>, ) -> Result { let region = self.get_region(&args.bucket, args.region).await?; @@ -2591,16 +3321,377 @@ impl<'a> Client<'a> { bucket_name: args.bucket.to_string(), }) } - // SetBucketNotificationResponse SetBucketNotification( - // SetBucketNotificationArgs args); - // SetBucketPolicyResponse SetBucketPolicy(SetBucketPolicyArgs args); - // SetBucketReplicationResponse SetBucketReplication( - // SetBucketReplicationArgs args); - // SetBucketTagsResponse SetBucketTags(SetBucketTagsArgs args); - // SetBucketVersioningResponse SetBucketVersioning(SetBucketVersioningArgs args); - // SetObjectLockConfigResponse SetObjectLockConfig(SetObjectLockConfigArgs args); - // SetObjectRetentionResponse SetObjectRetention(SetObjectRetentionArgs args); - // SetObjectTagsResponse SetObjectTags(SetObjectTagsArgs args); + + pub async fn set_bucket_notification( + &self, + args: &SetBucketNotificationArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("notification"), String::new()); + + let resp = self + .execute( + Method::PUT, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + Some(args.config.to_xml().as_bytes()), + ) + .await?; + + Ok(SetBucketNotificationResponse { + headers: resp.headers().clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + }) + } + + pub async fn set_bucket_policy( + &self, + args: &SetBucketPolicyArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("policy"), String::new()); + + let resp = self + .execute( + Method::PUT, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + Some(args.config.as_bytes()), + ) + .await?; + + Ok(SetBucketPolicyResponse { + headers: resp.headers().clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + }) + } + + pub async fn set_bucket_replication( + &self, + args: &SetBucketReplicationArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("replication"), String::new()); + + let resp = self + .execute( + Method::PUT, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + Some(args.config.to_xml().as_bytes()), + ) + .await?; + + Ok(SetBucketReplicationResponse { + headers: resp.headers().clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + }) + } + + pub async fn set_bucket_tags( + &self, + args: &SetBucketTagsArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("tagging"), String::new()); + + let mut data = String::from(""); + if !args.tags.is_empty() { + data.push_str(""); + for (key, value) in args.tags.iter() { + data.push_str(""); + data.push_str(""); + data.push_str(&key); + data.push_str(""); + data.push_str(""); + data.push_str(&value); + data.push_str(""); + data.push_str(""); + } + data.push_str(""); + } + data.push_str(""); + + let resp = self + .execute( + Method::PUT, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + Some(data.as_bytes()), + ) + .await?; + + Ok(SetBucketTagsResponse { + headers: resp.headers().clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + }) + } + + pub async fn set_bucket_versioning( + &self, + args: &SetBucketVersioningArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("versioning"), String::new()); + + let mut data = String::from(""); + data.push_str(""); + data.push_str(match args.status { + true => "Enabled", + false => "Suspended", + }); + data.push_str(""); + if let Some(v) = args.mfa_delete { + data.push_str(""); + data.push_str(match v { + true => "Enabled", + false => "Disabled", + }); + data.push_str(""); + } + data.push_str(""); + + let resp = self + .execute( + Method::PUT, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + Some(data.as_bytes()), + ) + .await?; + + Ok(SetBucketVersioningResponse { + headers: resp.headers().clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + }) + } + + pub async fn set_object_lock_config( + &self, + args: &SetObjectLockConfigArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("object-lock"), String::new()); + + let resp = self + .execute( + Method::PUT, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + Some(args.config.to_xml().as_bytes()), + ) + .await?; + + Ok(SetObjectLockConfigResponse { + headers: resp.headers().clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + }) + } + + pub async fn set_object_retention( + &self, + args: &SetObjectRetentionArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + if args.bypass_governance_mode { + headers.insert( + String::from("x-amz-bypass-governance-retention"), + String::from("true"), + ); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + if let Some(v) = args.version_id { + query_params.insert(String::from("versionId"), v.to_string()); + } + query_params.insert(String::from("retention"), String::new()); + + let mut data = String::from(""); + if let Some(v) = &args.retention_mode { + data.push_str(""); + data.push_str(&v.to_string()); + data.push_str(""); + } + if let Some(v) = &args.retain_until_date { + data.push_str(""); + data.push_str(&to_iso8601utc(*v)); + data.push_str(""); + } + data.push_str(""); + + headers.insert(String::from("Content-MD5"), md5sum_hash(&data.as_bytes())); + + let resp = self + .execute( + Method::PUT, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + Some(&args.object), + Some(data.as_bytes()), + ) + .await?; + + Ok(SetObjectRetentionResponse { + headers: resp.headers().clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + object_name: args.object.to_string(), + version_id: args.version_id.as_ref().map(|v| v.to_string()), + }) + } + + pub async fn set_object_tags( + &self, + args: &SetObjectTagsArgs<'_>, + ) -> Result { + let region = self.get_region(&args.bucket, args.region).await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + if let Some(v) = args.version_id { + query_params.insert(String::from("versionId"), v.to_string()); + } + query_params.insert(String::from("tagging"), String::new()); + + let mut data = String::from(""); + if !args.tags.is_empty() { + data.push_str(""); + for (key, value) in args.tags.iter() { + data.push_str(""); + data.push_str(""); + data.push_str(&key); + data.push_str(""); + data.push_str(""); + data.push_str(&value); + data.push_str(""); + data.push_str(""); + } + data.push_str(""); + } + data.push_str(""); + + let resp = self + .execute( + Method::PUT, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + Some(&args.object), + Some(data.as_bytes()), + ) + .await?; + + Ok(SetObjectTagsResponse { + headers: resp.headers().clone(), + region: region.clone(), + bucket_name: args.bucket.to_string(), + object_name: args.object.to_string(), + version_id: args.version_id.as_ref().map(|v| v.to_string()), + }) + } + pub async fn select_object_content( &self, args: &SelectObjectContentArgs<'_>, @@ -2683,6 +3774,33 @@ impl<'a> Client<'a> { StatObjectResponse::new(&resp.headers(), ®ion, &args.bucket, &args.object) } + pub async fn upload_object( + &self, + args: &UploadObjectArgs<'_>, + ) -> Result { + let mut file = File::open(args.filename)?; + + self.put_object(&mut PutObjectArgs { + extra_headers: args.extra_headers, + extra_query_params: args.extra_query_params, + region: args.region, + bucket: args.bucket, + object: args.object, + headers: args.headers, + user_metadata: args.user_metadata, + sse: args.sse, + tags: args.tags, + retention: args.retention, + legal_hold: args.legal_hold, + object_size: args.object_size, + part_size: args.part_size, + part_count: args.part_count, + content_type: args.content_type, + stream: &mut file, + }) + .await + } + pub async fn upload_part( &self, args: &UploadPartArgs<'_>, diff --git a/src/s3/error.rs b/src/s3/error.rs index ab2f177..d44dca5 100644 --- a/src/s3/error.rs +++ b/src/s3/error.rs @@ -102,6 +102,8 @@ pub enum Error { InvalidDateAndDays(String), InvalidLifecycleRuleId, InvalidFilter, + PostPolicyError(String), + InvalidObjectLockConfig(String), } impl std::error::Error for Error {} @@ -160,6 +162,8 @@ impl fmt::Display for Error { Error::InvalidDateAndDays(m) => write!(f, "Only one of date or days of {} must be set", m), Error::InvalidLifecycleRuleId => write!(f, "id must be exceed 255 characters"), Error::InvalidFilter => write!(f, "only one of And, Prefix or Tag must be provided"), + Error::PostPolicyError(m) => write!(f, "{}", m), + Error::InvalidObjectLockConfig(m) => write!(f, "{}", m), } } } diff --git a/src/s3/response.rs b/src/s3/response.rs index b70196a..b9f0504 100644 --- a/src/s3/response.rs +++ b/src/s3/response.rs @@ -15,7 +15,8 @@ use crate::s3::error::Error; use crate::s3::types::{ - parse_legal_hold, Bucket, Item, LifecycleConfig, RetentionMode, SelectProgress, SseConfig, + parse_legal_hold, Bucket, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig, + ReplicationConfig, RetentionMode, SelectProgress, SseConfig, }; use crate::s3::utils::{ copy_slice, crc32, from_http_header_value, from_iso8601utc, get_text, uint32, UtcTime, @@ -91,6 +92,8 @@ pub type CopyObjectResponse = PutObjectApiResponse; pub type ComposeObjectResponse = PutObjectApiResponse; +pub type UploadObjectResponse = PutObjectApiResponse; + #[derive(Debug)] pub struct StatObjectResponse { pub headers: HeaderMap, @@ -603,6 +606,7 @@ impl SelectObjectContentResponse { } } +#[derive(Clone, Debug)] pub struct ListenBucketNotificationResponse { pub headers: HeaderMap, pub region: String, @@ -625,6 +629,7 @@ impl ListenBucketNotificationResponse { pub type DeleteBucketEncryptionResponse = BucketResponse; +#[derive(Clone, Debug)] pub struct GetBucketEncryptionResponse { pub headers: HeaderMap, pub region: String, @@ -638,6 +643,7 @@ pub type EnableObjectLegalHoldResponse = ObjectResponse; pub type DisableObjectLegalHoldResponse = ObjectResponse; +#[derive(Clone, Debug)] pub struct IsObjectLegalHoldEnabledResponse { pub headers: HeaderMap, pub region: String, @@ -649,6 +655,7 @@ pub struct IsObjectLegalHoldEnabledResponse { pub type DeleteBucketLifecycleResponse = BucketResponse; +#[derive(Clone, Debug)] pub struct GetBucketLifecycleResponse { pub headers: HeaderMap, pub region: String, @@ -657,3 +664,119 @@ pub struct GetBucketLifecycleResponse { } pub type SetBucketLifecycleResponse = BucketResponse; + +pub type DeleteBucketNotificationResponse = BucketResponse; + +#[derive(Clone, Debug)] +pub struct GetBucketNotificationResponse { + pub headers: HeaderMap, + pub region: String, + pub bucket_name: String, + pub config: NotificationConfig, +} + +pub type SetBucketNotificationResponse = BucketResponse; + +pub type DeleteBucketPolicyResponse = BucketResponse; + +#[derive(Clone, Debug)] +pub struct GetBucketPolicyResponse { + pub headers: HeaderMap, + pub region: String, + pub bucket_name: String, + pub config: String, +} + +pub type SetBucketPolicyResponse = BucketResponse; + +pub type DeleteBucketReplicationResponse = BucketResponse; + +#[derive(Clone, Debug)] +pub struct GetBucketReplicationResponse { + pub headers: HeaderMap, + pub region: String, + pub bucket_name: String, + pub config: ReplicationConfig, +} + +pub type SetBucketReplicationResponse = BucketResponse; + +pub type DeleteBucketTagsResponse = BucketResponse; + +#[derive(Clone, Debug)] +pub struct GetBucketTagsResponse { + pub headers: HeaderMap, + pub region: String, + pub bucket_name: String, + pub tags: std::collections::HashMap, +} + +pub type SetBucketTagsResponse = BucketResponse; + +#[derive(Clone, Debug)] +pub struct GetBucketVersioningResponse { + pub headers: HeaderMap, + pub region: String, + pub bucket_name: String, + pub status: Option, + pub mfa_delete: Option, +} + +pub type SetBucketVersioningResponse = BucketResponse; + +pub type DeleteObjectLockConfigResponse = BucketResponse; + +#[derive(Clone, Debug)] +pub struct GetObjectLockConfigResponse { + pub headers: HeaderMap, + pub region: String, + pub bucket_name: String, + pub config: ObjectLockConfig, +} + +pub type SetObjectLockConfigResponse = BucketResponse; + +#[derive(Clone, Debug)] +pub struct GetObjectRetentionResponse { + pub headers: HeaderMap, + pub region: String, + pub bucket_name: String, + pub object_name: String, + pub version_id: Option, + pub retention_mode: Option, + pub retain_until_date: Option, +} + +pub type SetObjectRetentionResponse = ObjectResponse; + +pub type DeleteObjectTagsResponse = ObjectResponse; + +#[derive(Clone, Debug)] +pub struct GetObjectTagsResponse { + pub headers: HeaderMap, + pub region: String, + pub bucket_name: String, + pub object_name: String, + pub version_id: Option, + pub tags: std::collections::HashMap, +} + +pub type SetObjectTagsResponse = ObjectResponse; + +#[derive(Clone, Debug)] +pub struct GetPresignedObjectUrlResponse { + pub region: String, + pub bucket_name: String, + pub object_name: String, + pub version_id: Option, + pub url: String, +} + +#[derive(Clone, Debug)] +pub struct DownloadObjectResponse { + pub headers: HeaderMap, + pub region: String, + pub bucket_name: String, + pub object_name: String, + pub version_id: Option, +} diff --git a/src/s3/types.rs b/src/s3/types.rs index 75050c3..820b01f 100644 --- a/src/s3/types.rs +++ b/src/s3/types.rs @@ -14,7 +14,9 @@ // limitations under the License. use crate::s3::error::Error; -use crate::s3::utils::{from_iso8601utc, get_default_text, get_text, to_iso8601utc, UtcTime}; +use crate::s3::utils::{ + from_iso8601utc, get_default_text, get_option_text, get_text, to_iso8601utc, UtcTime, +}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt; @@ -51,15 +53,15 @@ pub struct Part { #[derive(Clone, Debug)] pub enum RetentionMode { - Governance, - Compliance, + GOVERNANCE, + COMPLIANCE, } impl RetentionMode { pub fn parse(s: &str) -> Result { match s { - "GOVERNANCE" => Ok(RetentionMode::Governance), - "COMPLIANCE" => Ok(RetentionMode::Compliance), + "GOVERNANCE" => Ok(RetentionMode::GOVERNANCE), + "COMPLIANCE" => Ok(RetentionMode::COMPLIANCE), _ => Err(Error::InvalidRetentionMode(s.to_string())), } } @@ -68,8 +70,8 @@ impl RetentionMode { impl fmt::Display for RetentionMode { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - RetentionMode::Governance => write!(f, "GOVERNANCE"), - RetentionMode::Compliance => write!(f, "COMPLIANCE"), + RetentionMode::GOVERNANCE => write!(f, "GOVERNANCE"), + RetentionMode::COMPLIANCE => write!(f, "COMPLIANCE"), } } } @@ -653,7 +655,7 @@ pub struct Filter { } impl Filter { - pub fn parse_xml(element: &Element) -> Result { + pub fn from_xml(element: &Element) -> Result { let and_operator = match element.get_child("And") { Some(v) => Some(AndOperator { prefix: match v.get_child("Prefix") { @@ -711,6 +713,49 @@ impl Filter { } return Err(Error::InvalidFilter); } + + pub fn to_xml(&self) -> String { + let mut data = String::from(""); + if self.and_operator.is_some() { + data.push_str(""); + if self.and_operator.as_ref().unwrap().prefix.is_some() { + data.push_str(""); + data.push_str(&self.and_operator.as_ref().unwrap().prefix.as_ref().unwrap()); + data.push_str(""); + } + if self.and_operator.as_ref().unwrap().tags.is_some() { + for (key, value) in self.and_operator.as_ref().unwrap().tags.as_ref().unwrap() { + data.push_str(""); + data.push_str(""); + data.push_str(&key); + data.push_str(""); + data.push_str(""); + data.push_str(&value); + data.push_str(""); + data.push_str(""); + } + } + data.push_str(""); + } + if self.prefix.is_some() { + data.push_str(""); + data.push_str(&self.prefix.as_ref().unwrap()); + data.push_str(""); + } + if self.tag.is_some() { + data.push_str(""); + data.push_str(""); + data.push_str(&self.tag.as_ref().unwrap().key); + data.push_str(""); + data.push_str(""); + data.push_str(&self.tag.as_ref().unwrap().value); + data.push_str(""); + data.push_str(""); + } + data.push_str(""); + + return data; + } } #[derive(Clone, Debug)] @@ -764,7 +809,7 @@ impl LifecycleRule { Some(v) => Some(get_text(v, "ExpiredObjectDeleteMarker")?.to_lowercase() == "true"), None => None, }, - filter: Filter::parse_xml( + filter: Filter::from_xml( element .get_child("Filter") .ok_or(Error::XmlError(format!(" tag not found")))?, @@ -859,17 +904,14 @@ impl LifecycleConfig { pub fn from_xml(root: &Element) -> Result { let mut config = LifecycleConfig { rules: Vec::new() }; - match root.get_child("Rule") { - Some(v) => { - for rule in &v.children { - config.rules.push(LifecycleRule::from_xml( - rule.as_element() - .ok_or(Error::XmlError(format!(" tag not found")))?, - )?); - } + if let Some(v) = root.get_child("Rule") { + for rule in &v.children { + config.rules.push(LifecycleRule::from_xml( + rule.as_element() + .ok_or(Error::XmlError(format!(" tag not found")))?, + )?); } - _ => todo!(), - }; + } return Ok(config); } @@ -930,61 +972,7 @@ impl LifecycleConfig { data.push_str(""); } - data.push_str(""); - if rule.filter.and_operator.is_some() { - data.push_str(""); - if rule.filter.and_operator.as_ref().unwrap().prefix.is_some() { - data.push_str(""); - data.push_str( - &rule - .filter - .and_operator - .as_ref() - .unwrap() - .prefix - .as_ref() - .unwrap(), - ); - data.push_str(""); - } - if rule.filter.and_operator.as_ref().unwrap().tags.is_some() { - for (key, value) in rule - .filter - .and_operator - .as_ref() - .unwrap() - .tags - .as_ref() - .unwrap() - { - data.push_str(""); - data.push_str(""); - data.push_str(&key); - data.push_str(""); - data.push_str(""); - data.push_str(&value); - data.push_str(""); - data.push_str(""); - } - } - data.push_str(""); - } - if rule.filter.prefix.is_some() { - data.push_str(""); - data.push_str(&rule.filter.prefix.as_ref().unwrap()); - data.push_str(""); - } - if rule.filter.tag.is_some() { - data.push_str(""); - data.push_str(""); - data.push_str(&rule.filter.tag.as_ref().unwrap().key); - data.push_str(""); - data.push_str(""); - data.push_str(&rule.filter.tag.as_ref().unwrap().value); - data.push_str(""); - data.push_str(""); - } - data.push_str(""); + data.push_str(&rule.filter.to_xml()); if !rule.id.is_empty() { data.push_str(""); @@ -1069,3 +1057,818 @@ impl LifecycleConfig { return data; } } + +fn parse_common_notification_config( + element: &mut Element, +) -> Result< + ( + Vec, + Option, + Option, + Option, + ), + Error, +> { + let mut events = Vec::new(); + loop { + match element.take_child("Event") { + Some(v) => events.push( + v.get_text() + .ok_or(Error::XmlError(format!("text of tag not found")))? + .to_string(), + ), + _ => break, + } + } + + let id = get_option_text(element, "Id"); + + let (prefix_filter_rule, suffix_filter_rule) = match element.get_child("Filter") { + Some(filter) => { + let mut prefix = None; + let mut suffix = None; + let rules = filter + .get_child("S3Key") + .ok_or(Error::XmlError(format!(" tag not found")))?; + for rule in &rules.children { + let v = rule + .as_element() + .ok_or(Error::XmlError(format!(" tag not found")))?; + let name = get_text(v, "Name")?; + let value = get_text(v, "Value")?; + if PrefixFilterRule::NAME == name { + prefix = Some(PrefixFilterRule { value: value }); + } else { + suffix = Some(SuffixFilterRule { value: value }); + } + } + (prefix, suffix) + } + _ => (None, None), + }; + + Ok((events, id, prefix_filter_rule, suffix_filter_rule)) +} + +fn to_xml_common_notification_config( + events: &Vec, + id: &Option, + prefix_filter_rule: &Option, + suffix_filter_rule: &Option, +) -> String { + let mut data = String::new(); + + for event in events { + data.push_str(""); + data.push_str(&event); + data.push_str(""); + } + + if let Some(v) = id { + data.push_str(""); + data.push_str(&v); + data.push_str(""); + } + + if prefix_filter_rule.is_some() || suffix_filter_rule.is_some() { + data.push_str(""); + + if let Some(v) = prefix_filter_rule { + data.push_str("prefix"); + data.push_str(""); + data.push_str(&v.value); + data.push_str(""); + } + + if let Some(v) = suffix_filter_rule { + data.push_str("suffix"); + data.push_str(""); + data.push_str(&v.value); + data.push_str(""); + } + + data.push_str(""); + } + + return data; +} + +#[derive(Clone, Debug)] +pub struct PrefixFilterRule { + pub value: String, +} + +impl PrefixFilterRule { + pub const NAME: &str = "prefix"; +} + +#[derive(Clone, Debug)] +pub struct SuffixFilterRule { + pub value: String, +} + +impl SuffixFilterRule { + pub const NAME: &str = "suffix"; +} + +#[derive(Clone, Debug)] +pub struct CloudFuncConfig { + pub events: Vec, + pub id: Option, + pub prefix_filter_rule: Option, + pub suffix_filter_rule: Option, + pub cloud_func: String, +} + +impl CloudFuncConfig { + pub fn from_xml(element: &mut Element) -> Result { + let (events, id, prefix_filter_rule, suffix_filter_rule) = + parse_common_notification_config(element)?; + Ok(CloudFuncConfig { + events: events, + id: id, + prefix_filter_rule: prefix_filter_rule, + suffix_filter_rule: suffix_filter_rule, + cloud_func: get_text(element, "CloudFunction")?, + }) + } + + pub fn validate(&self) -> Result<(), Error> { + if self.events.len() != 0 && self.cloud_func != "" { + return Ok(()); + } + + return Err(Error::InvalidFilter); + } + + pub fn to_xml(&self) -> String { + let mut data = String::from(""); + + data.push_str(""); + data.push_str(&self.cloud_func); + data.push_str(""); + + data.push_str(&to_xml_common_notification_config( + &self.events, + &self.id, + &self.prefix_filter_rule, + &self.suffix_filter_rule, + )); + + data.push_str(""); + + return data; + } +} + +#[derive(Clone, Debug)] +pub struct QueueConfig { + pub events: Vec, + pub id: Option, + pub prefix_filter_rule: Option, + pub suffix_filter_rule: Option, + pub queue: String, +} + +impl QueueConfig { + pub fn from_xml(element: &mut Element) -> Result { + let (events, id, prefix_filter_rule, suffix_filter_rule) = + parse_common_notification_config(element)?; + Ok(QueueConfig { + events: events, + id: id, + prefix_filter_rule: prefix_filter_rule, + suffix_filter_rule: suffix_filter_rule, + queue: get_text(element, "Queue")?, + }) + } + + pub fn validate(&self) -> Result<(), Error> { + if self.events.len() != 0 && self.queue != "" { + return Ok(()); + } + + return Err(Error::InvalidFilter); + } + + pub fn to_xml(&self) -> String { + let mut data = String::from(""); + + data.push_str(""); + data.push_str(&self.queue); + data.push_str(""); + + data.push_str(&to_xml_common_notification_config( + &self.events, + &self.id, + &self.prefix_filter_rule, + &self.suffix_filter_rule, + )); + + data.push_str(""); + + return data; + } +} + +#[derive(Clone, Debug)] +pub struct TopicConfig { + pub events: Vec, + pub id: Option, + pub prefix_filter_rule: Option, + pub suffix_filter_rule: Option, + pub topic: String, +} + +impl TopicConfig { + pub fn from_xml(element: &mut Element) -> Result { + let (events, id, prefix_filter_rule, suffix_filter_rule) = + parse_common_notification_config(element)?; + Ok(TopicConfig { + events: events, + id: id, + prefix_filter_rule: prefix_filter_rule, + suffix_filter_rule: suffix_filter_rule, + topic: get_text(element, "Topic")?, + }) + } + + pub fn validate(&self) -> Result<(), Error> { + if self.events.len() != 0 && self.topic != "" { + return Ok(()); + } + + return Err(Error::InvalidFilter); + } + + pub fn to_xml(&self) -> String { + let mut data = String::from(""); + + data.push_str(""); + data.push_str(&self.topic); + data.push_str(""); + + data.push_str(&to_xml_common_notification_config( + &self.events, + &self.id, + &self.prefix_filter_rule, + &self.suffix_filter_rule, + )); + + data.push_str(""); + + return data; + } +} + +#[derive(Clone, Debug)] +pub struct NotificationConfig { + pub cloud_func_config_list: Option>, + pub queue_config_list: Option>, + pub topic_config_list: Option>, +} + +impl NotificationConfig { + pub fn from_xml(root: &mut Element) -> Result { + let mut config = NotificationConfig { + cloud_func_config_list: None, + queue_config_list: None, + topic_config_list: None, + }; + + let mut cloud_func_config_list = Vec::new(); + loop { + match root.take_child("CloudFunctionConfiguration") { + Some(mut v) => cloud_func_config_list.push(CloudFuncConfig::from_xml(&mut v)?), + _ => break, + } + } + if cloud_func_config_list.len() != 0 { + config.cloud_func_config_list = Some(cloud_func_config_list); + } + + let mut queue_config_list = Vec::new(); + loop { + match root.take_child("QueueConfiguration") { + Some(mut v) => queue_config_list.push(QueueConfig::from_xml(&mut v)?), + _ => break, + } + } + if queue_config_list.len() != 0 { + config.queue_config_list = Some(queue_config_list); + } + + let mut topic_config_list = Vec::new(); + loop { + match root.take_child("TopicConfiguration") { + Some(mut v) => topic_config_list.push(TopicConfig::from_xml(&mut v)?), + _ => break, + } + } + if topic_config_list.len() != 0 { + config.topic_config_list = Some(topic_config_list); + } + + return Ok(config); + } + + pub fn validate(&self) -> Result<(), Error> { + if let Some(v) = &self.cloud_func_config_list { + for rule in v { + rule.validate()?; + } + } + + if let Some(v) = &self.queue_config_list { + for rule in v { + rule.validate()?; + } + } + + if let Some(v) = &self.topic_config_list { + for rule in v { + rule.validate()?; + } + } + + Ok(()) + } + + pub fn to_xml(&self) -> String { + let mut data = String::from(""); + + if let Some(v) = &self.cloud_func_config_list { + for rule in v { + data.push_str(&rule.to_xml()) + } + } + + if let Some(v) = &self.queue_config_list { + for rule in v { + data.push_str(&rule.to_xml()) + } + } + + if let Some(v) = &self.topic_config_list { + for rule in v { + data.push_str(&rule.to_xml()) + } + } + + data.push_str(""); + return data; + } +} + +#[derive(Clone, Debug)] +pub struct AccessControlTranslation { + pub owner: String, +} + +impl AccessControlTranslation { + pub fn new() -> AccessControlTranslation { + AccessControlTranslation { + owner: String::from("Destination"), + } + } +} + +#[derive(Clone, Debug)] +pub struct EncryptionConfig { + pub replica_kms_key_id: Option, +} + +#[derive(Clone, Debug)] +pub struct Metrics { + pub event_threshold_minutes: Option, + pub status: bool, +} + +impl Metrics { + pub fn new(status: bool) -> Metrics { + Metrics { + event_threshold_minutes: Some(15), + status: status, + } + } +} + +#[derive(Clone, Debug)] +pub struct ReplicationTime { + pub time_minutes: Option, + pub status: bool, +} + +impl ReplicationTime { + pub fn new(status: bool) -> ReplicationTime { + ReplicationTime { + time_minutes: Some(15), + status: status, + } + } +} + +#[derive(Clone, Debug)] +pub struct Destination { + pub bucket_arn: String, + pub access_control_translation: Option, + pub account: Option, + pub encryption_config: Option, + pub metrics: Option, + pub replication_time: Option, + pub storage_class: Option, +} + +impl Destination { + pub fn from_xml(element: &Element) -> Result { + Ok(Destination { + bucket_arn: get_text(element, "Bucket")?, + access_control_translation: match element.get_child("AccessControlTranslation") { + Some(v) => Some(AccessControlTranslation { + owner: get_text(v, "Owner")?, + }), + _ => None, + }, + account: get_option_text(element, "Account"), + encryption_config: match element.get_child("EncryptionConfiguration") { + Some(v) => Some(EncryptionConfig { + replica_kms_key_id: get_option_text(v, "ReplicaKmsKeyID"), + }), + _ => None, + }, + metrics: match element.get_child("Metrics") { + Some(v) => Some(Metrics { + event_threshold_minutes: match get_option_text( + v.get_child("EventThreshold") + .ok_or(Error::XmlError(format!(" tag not found")))?, + "Minutes", + ) { + Some(v) => Some(v.parse::()?), + _ => None, + }, + status: get_text(v, "Status")? == "Enabled", + }), + _ => None, + }, + replication_time: match element.get_child("ReplicationTime") { + Some(v) => Some(ReplicationTime { + time_minutes: match get_option_text(v, "Time") { + Some(v) => Some(v.parse::()?), + _ => None, + }, + status: get_text(v, "Status")? == "Enabled", + }), + _ => None, + }, + storage_class: get_option_text(element, "StorageClass"), + }) + } + + pub fn to_xml(&self) -> String { + let mut data = String::from(""); + + data.push_str(""); + data.push_str(&self.bucket_arn); + data.push_str(""); + + if let Some(v) = &self.access_control_translation { + data.push_str(""); + data.push_str(&v.owner); + data.push_str(""); + } + + if let Some(v) = &self.account { + data.push_str(""); + data.push_str(&v); + data.push_str(""); + } + + if let Some(c) = &self.encryption_config { + data.push_str(""); + if let Some(v) = &c.replica_kms_key_id { + data.push_str(""); + data.push_str(&v); + data.push_str(""); + } + data.push_str(""); + } + + if let Some(m) = &self.metrics { + data.push_str(""); + + if let Some(v) = m.event_threshold_minutes { + data.push_str(""); + data.push_str(&v.to_string()); + data.push_str(""); + } + + data.push_str(""); + data.push_str(match m.status { + true => "Enabled", + false => "Disabled", + }); + data.push_str(""); + + data.push_str(""); + } + + if let Some(t) = &self.replication_time { + data.push_str(""); + + data.push_str(""); + + data.push_str(""); + data.push_str(match t.status { + true => "Enabled", + false => "Disabled", + }); + data.push_str(""); + + data.push_str(""); + } + + if let Some(v) = &self.storage_class { + data.push_str(""); + data.push_str(&v); + data.push_str(""); + } + + data.push_str(""); + + return data; + } +} + +#[derive(Clone, Debug)] +pub struct SourceSelectionCriteria { + pub sse_kms_encrypted_objects_status: Option, +} + +#[derive(Clone, Debug)] +pub struct ReplicationRule { + pub destination: Destination, + pub delete_marker_replication_status: Option, + pub existing_object_replication_status: Option, + pub filter: Option, + pub id: Option, + pub prefix: Option, + pub priority: Option, + pub source_selection_criteria: Option, + pub delete_replication_status: Option, + pub status: bool, +} + +impl ReplicationRule { + pub fn from_xml(element: &Element) -> Result { + Ok(ReplicationRule { + destination: Destination::from_xml( + element + .get_child("Destination") + .ok_or(Error::XmlError(format!(" tag not found")))?, + )?, + delete_marker_replication_status: match element.get_child("DeleteMarkerReplication") { + Some(v) => Some(get_text(v, "Status")? == "Enabled"), + _ => None, + }, + existing_object_replication_status: match element.get_child("ExistingObjectReplication") + { + Some(v) => Some(get_text(v, "Status")? == "Enabled"), + _ => None, + }, + filter: match element.get_child("Filter") { + Some(v) => Some(Filter::from_xml(v)?), + _ => None, + }, + id: get_option_text(element, "ID"), + prefix: get_option_text(element, "Prefix"), + priority: match get_option_text(element, "Priority") { + Some(v) => Some(v.parse::()?), + _ => None, + }, + source_selection_criteria: match element.get_child("SourceSelectionCriteria") { + Some(v) => match v.get_child("SseKmsEncryptedObjects") { + Some(v) => Some(SourceSelectionCriteria { + sse_kms_encrypted_objects_status: Some(get_text(v, "Status")? == "Enabled"), + }), + _ => Some(SourceSelectionCriteria { + sse_kms_encrypted_objects_status: None, + }), + }, + _ => None, + }, + delete_replication_status: match element.get_child("DeleteReplication") { + Some(v) => Some(get_text(v, "Status")? == "Enabled"), + _ => None, + }, + status: get_text(element, "Status")? == "Enabled", + }) + } + + pub fn to_xml(&self) -> String { + let mut data = self.destination.to_xml(); + + if let Some(v) = self.delete_marker_replication_status { + data.push_str(""); + data.push_str(""); + data.push_str(match v { + true => "Enabled", + false => "Disabled", + }); + data.push_str(""); + data.push_str(""); + } + + if let Some(v) = self.existing_object_replication_status { + data.push_str(""); + data.push_str(""); + data.push_str(match v { + true => "Enabled", + false => "Disabled", + }); + data.push_str(""); + data.push_str(""); + } + + if let Some(v) = &self.filter { + data.push_str(&v.to_xml()) + } + + if let Some(v) = &self.id { + data.push_str(""); + data.push_str(&v); + data.push_str(""); + } + + if let Some(v) = &self.prefix { + data.push_str(""); + data.push_str(&v); + data.push_str(""); + } + + if let Some(v) = self.priority { + data.push_str(""); + data.push_str(&v.to_string()); + data.push_str(""); + } + + if let Some(s) = &self.source_selection_criteria { + data.push_str(""); + if let Some(v) = s.sse_kms_encrypted_objects_status { + data.push_str(""); + data.push_str(""); + data.push_str(match v { + true => "Enabled", + false => "Disabled", + }); + data.push_str(""); + data.push_str(""); + } + data.push_str(""); + } + + if let Some(v) = self.delete_replication_status { + data.push_str(""); + data.push_str(""); + data.push_str(match v { + true => "Enabled", + false => "Disabled", + }); + data.push_str(""); + data.push_str(""); + } + + data.push_str(""); + data.push_str(match self.status { + true => "Enabled", + false => "Disabled", + }); + data.push_str(""); + + return data; + } +} + +#[derive(Clone, Debug)] +pub struct ReplicationConfig { + pub role: Option, + pub rules: Vec, +} + +impl ReplicationConfig { + pub fn from_xml(root: &Element) -> Result { + let mut config = ReplicationConfig { + role: get_option_text(root, "Role"), + rules: Vec::new(), + }; + + if let Some(v) = root.get_child("Rule") { + for rule in &v.children { + config.rules.push(ReplicationRule::from_xml( + rule.as_element() + .ok_or(Error::XmlError(format!(" tag not found")))?, + )?); + } + } + + return Ok(config); + } + + pub fn to_xml(&self) -> String { + let mut data = String::from(""); + + if let Some(v) = &self.role { + data.push_str(""); + data.push_str(&v); + data.push_str(""); + } + + for rule in &self.rules { + data.push_str(&rule.to_xml()); + } + + data.push_str(""); + return data; + } +} + +#[derive(Clone, Debug)] +pub struct ObjectLockConfig { + pub retention_mode: Option, + pub retention_duration_days: Option, + pub retention_duration_years: Option, +} + +impl ObjectLockConfig { + pub fn new( + mode: RetentionMode, + days: Option, + years: Option, + ) -> Result { + if days.is_some() ^ years.is_some() { + return Ok(ObjectLockConfig { + retention_mode: Some(mode), + retention_duration_days: days, + retention_duration_years: years, + }); + } + + Err(Error::InvalidObjectLockConfig(format!( + "only one days or years must be set" + ))) + } + + pub fn from_xml(root: &Element) -> Result { + let mut config = ObjectLockConfig { + retention_mode: None, + retention_duration_days: None, + retention_duration_years: None, + }; + + if let Some(r) = root.get_child("Rule") { + let default_retention = r + .get_child("DefaultRetention") + .ok_or(Error::XmlError(format!(" tag not found")))?; + config.retention_mode = + Some(RetentionMode::parse(&get_text(default_retention, "Mode")?)?); + + if let Some(v) = get_option_text(default_retention, "Days") { + config.retention_duration_days = Some(v.parse::()?); + } + + if let Some(v) = get_option_text(default_retention, "Years") { + config.retention_duration_years = Some(v.parse::()?); + } + } + + return Ok(config); + } + + pub fn to_xml(&self) -> String { + let mut data = String::from(""); + data.push_str("Enabled"); + if let Some(v) = &self.retention_mode { + data.push_str(""); + data.push_str(""); + data.push_str(&v.to_string()); + data.push_str(""); + if let Some(d) = self.retention_duration_days { + data.push_str(""); + data.push_str(&d.to_string()); + data.push_str(""); + } + if let Some(d) = self.retention_duration_years { + data.push_str(""); + data.push_str(&d.to_string()); + data.push_str(""); + } + data.push_str(""); + } + data.push_str(""); + + return data; + } +} diff --git a/src/s3/utils.rs b/src/s3/utils.rs index 691e1b3..15e5ecf 100644 --- a/src/s3/utils.rs +++ b/src/s3/utils.rs @@ -262,15 +262,12 @@ pub fn get_text(element: &Element, tag: &str) -> Result { .to_string()) } -pub fn get_option_text(element: &Element, tag: &str) -> Result, Error> { - Ok(match element.get_child(tag) { - Some(v) => Some( - v.get_text() - .ok_or(Error::XmlError(format!("text of <{}> tag not found", tag)))? - .to_string(), - ), - None => None, - }) +pub fn get_option_text(element: &Element, tag: &str) -> Option { + if let Some(v) = element.get_child(tag) { + return Some(v.get_text().unwrap_or_default().to_string()); + } + + None } pub fn get_default_text(element: &Element, tag: &str) -> String { diff --git a/tests/start-server.sh b/tests/start-server.sh index e9c7b1a..0a3d330 100755 --- a/tests/start-server.sh +++ b/tests/start-server.sh @@ -3,10 +3,14 @@ set -x set -e -wget --quiet https://dl.min.io/server/minio/release/linux-amd64/minio && \ - chmod +x minio && \ - mkdir -p /tmp/certs && \ - cp ./tests/public.crt ./tests/private.key /tmp/certs/ && \ +wget --quiet https://dl.min.io/server/minio/release/linux-amd64/minio +chmod +x minio +mkdir -p /tmp/certs +cp ./tests/public.crt ./tests/private.key /tmp/certs/ + +(MINIO_CI_CD=true \ + MINIO_NOTIFY_WEBHOOK_ENABLE_miniojavatest=on \ + MINIO_NOTIFY_WEBHOOK_ENDPOINT_miniojavatest=http://example.org/ \ + ./minio server /tmp/test-xl/{1...4}/ --certs-dir /tmp/certs/ &) -MINIO_CI_CD=true ./minio server /tmp/test-xl/{1...4}/ --certs-dir /tmp/certs/ & sleep 10 diff --git a/tests/tests.rs b/tests/tests.rs index bb5333f..603c480 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -14,9 +14,14 @@ // limitations under the License. use async_std::task; +use chrono::Duration; +use hyper::http::Method; use minio::s3::types::NotificationRecords; use rand::distributions::{Alphanumeric, DistString}; +use sha2::{Digest, Sha256}; +use std::collections::HashMap; use std::io::BufReader; +use std::{fs, io}; use tokio::sync::mpsc; use minio::s3::args::*; @@ -24,9 +29,11 @@ use minio::s3::client::Client; use minio::s3::creds::StaticProvider; use minio::s3::http::BaseUrl; use minio::s3::types::{ - CsvInputSerialization, CsvOutputSerialization, DeleteObject, FileHeaderInfo, QuoteFields, - SelectRequest, + CsvInputSerialization, CsvOutputSerialization, DeleteObject, FileHeaderInfo, + NotificationConfig, ObjectLockConfig, PrefixFilterRule, QueueConfig, QuoteFields, + RetentionMode, SelectRequest, SuffixFilterRule, }; +use minio::s3::utils::{to_iso8601utc, utc_now}; struct RandReader { size: usize, @@ -77,6 +84,8 @@ struct ClientTest<'a> { } impl<'a> ClientTest<'_> { + const SQS_ARN: &str = "arn:minio:sqs::miniojavatest:webhook"; + fn new( base_url: BaseUrl, access_key: String, @@ -255,6 +264,89 @@ impl<'a> ClientTest<'_> { .unwrap(); } + fn get_hash(filename: &String) -> String { + let mut hasher = Sha256::new(); + let mut file = fs::File::open(filename).unwrap(); + io::copy(&mut file, &mut hasher).unwrap(); + return format!("{:x}", hasher.finalize()); + } + + async fn upload_download_object(&self) { + let object_name = rand_object_name(); + let size = 16_usize; + let mut file = fs::File::create(&object_name).unwrap(); + io::copy(&mut RandReader::new(size), &mut file).unwrap(); + file.sync_all().unwrap(); + self.client + .upload_object( + &mut UploadObjectArgs::new(&self.test_bucket, &object_name, &object_name).unwrap(), + ) + .await + .unwrap(); + + let filename = rand_object_name(); + self.client + .download_object( + &DownloadObjectArgs::new(&self.test_bucket, &object_name, &filename).unwrap(), + ) + .await + .unwrap(); + assert_eq!( + ClientTest::get_hash(&object_name) == ClientTest::get_hash(&filename), + true + ); + + fs::remove_file(&object_name).unwrap(); + fs::remove_file(&filename).unwrap(); + + self.client + .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .await + .unwrap(); + + self.client + .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .await + .unwrap(); + + let object_name = rand_object_name(); + let size: usize = 16 + 5 * 1024 * 1024; + let mut file = fs::File::create(&object_name).unwrap(); + io::copy(&mut RandReader::new(size), &mut file).unwrap(); + file.sync_all().unwrap(); + self.client + .upload_object( + &mut UploadObjectArgs::new(&self.test_bucket, &object_name, &object_name).unwrap(), + ) + .await + .unwrap(); + + let filename = rand_object_name(); + self.client + .download_object( + &DownloadObjectArgs::new(&self.test_bucket, &object_name, &filename).unwrap(), + ) + .await + .unwrap(); + assert_eq!( + ClientTest::get_hash(&object_name) == ClientTest::get_hash(&filename), + true + ); + + fs::remove_file(&object_name).unwrap(); + fs::remove_file(&filename).unwrap(); + + self.client + .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .await + .unwrap(); + + self.client + .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .await + .unwrap(); + } + async fn remove_objects(&self) { let bucket_name = rand_bucket_name(); self.client @@ -598,6 +690,479 @@ impl<'a> ClientTest<'_> { .await .unwrap(); } + + async fn set_get_delete_bucket_notification(&self) { + let bucket_name = rand_bucket_name(); + self.client + .make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + + self.client + .set_bucket_notification( + &SetBucketNotificationArgs::new( + &bucket_name, + &NotificationConfig { + cloud_func_config_list: None, + queue_config_list: Some(vec![QueueConfig { + events: vec![ + String::from("s3:ObjectCreated:Put"), + String::from("s3:ObjectCreated:Copy"), + ], + id: None, + prefix_filter_rule: Some(PrefixFilterRule { + value: String::from("images"), + }), + suffix_filter_rule: Some(SuffixFilterRule { + value: String::from("pg"), + }), + queue: String::from(ClientTest::SQS_ARN), + }]), + topic_config_list: None, + }, + ) + .unwrap(), + ) + .await + .unwrap(); + + let resp = self + .client + .get_bucket_notification(&GetBucketNotificationArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.config.queue_config_list.as_ref().unwrap().len(), 1); + assert_eq!( + resp.config.queue_config_list.as_ref().unwrap()[0] + .events + .contains(&String::from("s3:ObjectCreated:Put")), + true + ); + assert_eq!( + resp.config.queue_config_list.as_ref().unwrap()[0] + .events + .contains(&String::from("s3:ObjectCreated:Copy")), + true + ); + assert_eq!( + resp.config.queue_config_list.as_ref().unwrap()[0] + .prefix_filter_rule + .as_ref() + .unwrap() + .value, + "images" + ); + assert_eq!( + resp.config.queue_config_list.as_ref().unwrap()[0] + .suffix_filter_rule + .as_ref() + .unwrap() + .value, + "pg" + ); + assert_eq!( + resp.config.queue_config_list.as_ref().unwrap()[0].queue, + ClientTest::SQS_ARN + ); + + self.client + .delete_bucket_notification(&DeleteBucketNotificationArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + + let resp = self + .client + .get_bucket_notification(&GetBucketNotificationArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.config.queue_config_list.is_none(), true); + + self.client + .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + } + + async fn set_get_delete_bucket_policy(&self) { + let bucket_name = rand_bucket_name(); + self.client + .make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + + let config = r#" +{ + "Version": "2012-10-17", + "Statement": [ + { + "Action": [ + "s3:GetObject" + ], + "Effect": "Allow", + "Principal": { + "AWS": [ + "*" + ] + }, + "Resource": [ + "arn:aws:s3:::/myobject*" + ], + "Sid": "" + } + ] +} +"# + .replace("", &bucket_name); + + self.client + .set_bucket_policy(&SetBucketPolicyArgs::new(&bucket_name, &config).unwrap()) + .await + .unwrap(); + + let resp = self + .client + .get_bucket_policy(&GetBucketPolicyArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.config.is_empty(), false); + + self.client + .delete_bucket_policy(&DeleteBucketPolicyArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + + let resp = self + .client + .get_bucket_policy(&GetBucketPolicyArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.config, "{}"); + + self.client + .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + } + + async fn set_get_delete_bucket_tags(&self) { + let bucket_name = rand_bucket_name(); + self.client + .make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + + let tags = HashMap::from([ + (String::from("Project"), String::from("Project One")), + (String::from("User"), String::from("jsmith")), + ]); + + self.client + .set_bucket_tags(&SetBucketTagsArgs::new(&bucket_name, &tags).unwrap()) + .await + .unwrap(); + + let resp = self + .client + .get_bucket_tags(&GetBucketTagsArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert_eq!( + resp.tags.len() == tags.len() && resp.tags.keys().all(|k| tags.contains_key(k)), + true + ); + + self.client + .delete_bucket_tags(&DeleteBucketTagsArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + + let resp = self + .client + .get_bucket_tags(&GetBucketTagsArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.tags.is_empty(), true); + + self.client + .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + } + + async fn set_get_delete_object_lock_config(&self) { + let bucket_name = rand_bucket_name(); + + let mut args = MakeBucketArgs::new(&bucket_name).unwrap(); + args.object_lock = true; + self.client.make_bucket(&args).await.unwrap(); + + self.client + .set_object_lock_config( + &SetObjectLockConfigArgs::new( + &bucket_name, + &ObjectLockConfig::new(RetentionMode::GOVERNANCE, Some(7), None).unwrap(), + ) + .unwrap(), + ) + .await + .unwrap(); + + let resp = self + .client + .get_object_lock_config(&GetObjectLockConfigArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert_eq!( + match resp.config.retention_mode { + Some(r) => match r { + RetentionMode::GOVERNANCE => true, + _ => false, + }, + _ => false, + }, + true + ); + + assert_eq!(resp.config.retention_duration_days == Some(7), true); + assert_eq!(resp.config.retention_duration_years.is_none(), true); + + self.client + .delete_object_lock_config(&DeleteObjectLockConfigArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + + let resp = self + .client + .get_object_lock_config(&GetObjectLockConfigArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.config.retention_mode.is_none(), true); + + self.client + .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + } + + async fn set_get_delete_object_tags(&self) { + let object_name = rand_object_name(); + + let size = 16_usize; + self.client + .put_object( + &mut PutObjectArgs::new( + &self.test_bucket, + &object_name, + &mut RandReader::new(size), + Some(size), + None, + ) + .unwrap(), + ) + .await + .unwrap(); + + let tags = HashMap::from([ + (String::from("Project"), String::from("Project One")), + (String::from("User"), String::from("jsmith")), + ]); + + self.client + .set_object_tags( + &SetObjectTagsArgs::new(&self.test_bucket, &object_name, &tags).unwrap(), + ) + .await + .unwrap(); + + let resp = self + .client + .get_object_tags(&GetObjectTagsArgs::new(&self.test_bucket, &object_name).unwrap()) + .await + .unwrap(); + assert_eq!( + resp.tags.len() == tags.len() && resp.tags.keys().all(|k| tags.contains_key(k)), + true + ); + + self.client + .delete_object_tags( + &DeleteObjectTagsArgs::new(&self.test_bucket, &object_name).unwrap(), + ) + .await + .unwrap(); + + let resp = self + .client + .get_object_tags(&GetObjectTagsArgs::new(&self.test_bucket, &object_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.tags.is_empty(), true); + + self.client + .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .await + .unwrap(); + } + + async fn set_get_bucket_versioning(&self) { + let bucket_name = rand_bucket_name(); + + self.client + .make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + + self.client + .set_bucket_versioning(&SetBucketVersioningArgs::new(&bucket_name, true).unwrap()) + .await + .unwrap(); + + let resp = self + .client + .get_bucket_versioning(&GetBucketVersioningArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert_eq!( + match resp.status { + Some(v) => v, + _ => false, + }, + true + ); + + self.client + .set_bucket_versioning(&SetBucketVersioningArgs::new(&bucket_name, false).unwrap()) + .await + .unwrap(); + + let resp = self + .client + .get_bucket_versioning(&GetBucketVersioningArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + assert_eq!( + match resp.status { + Some(v) => v, + _ => false, + }, + false + ); + + self.client + .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + } + + async fn set_get_object_retention(&self) { + let bucket_name = rand_bucket_name(); + + let mut args = MakeBucketArgs::new(&bucket_name).unwrap(); + args.object_lock = true; + self.client.make_bucket(&args).await.unwrap(); + + let object_name = rand_object_name(); + + let size = 16_usize; + let obj_resp = self + .client + .put_object( + &mut PutObjectArgs::new( + &bucket_name, + &object_name, + &mut RandReader::new(size), + Some(size), + None, + ) + .unwrap(), + ) + .await + .unwrap(); + + let mut args = SetObjectRetentionArgs::new(&bucket_name, &object_name).unwrap(); + args.retention_mode = Some(RetentionMode::GOVERNANCE); + let retain_until_date = utc_now() + Duration::days(1); + args.retain_until_date = Some(retain_until_date); + + self.client.set_object_retention(&args).await.unwrap(); + + let resp = self + .client + .get_object_retention(&GetObjectRetentionArgs::new(&bucket_name, &object_name).unwrap()) + .await + .unwrap(); + assert_eq!( + match resp.retention_mode { + Some(v) => match v { + RetentionMode::GOVERNANCE => true, + _ => false, + }, + _ => false, + }, + true + ); + assert_eq!( + match resp.retain_until_date { + Some(v) => to_iso8601utc(v) == to_iso8601utc(retain_until_date), + _ => false, + }, + true, + ); + + let mut args = SetObjectRetentionArgs::new(&bucket_name, &object_name).unwrap(); + args.bypass_governance_mode = true; + self.client.set_object_retention(&args).await.unwrap(); + + let resp = self + .client + .get_object_retention(&GetObjectRetentionArgs::new(&bucket_name, &object_name).unwrap()) + .await + .unwrap(); + assert_eq!(resp.retention_mode.is_none(), true); + assert_eq!(resp.retain_until_date.is_none(), true); + + let mut args = RemoveObjectArgs::new(&bucket_name, &object_name).unwrap(); + let version_id = obj_resp.version_id.unwrap().clone(); + args.version_id = Some(version_id.as_str()); + self.client.remove_object(&args).await.unwrap(); + + self.client + .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + } + + async fn get_presigned_object_url(&self) { + let object_name = rand_object_name(); + let resp = self + .client + .get_presigned_object_url( + &GetPresignedObjectUrlArgs::new(&self.test_bucket, &object_name, Method::GET) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.url.contains("X-Amz-Signature="), true); + } + + async fn get_presigned_post_form_data(&self) { + let object_name = rand_object_name(); + let expiration = utc_now() + Duration::days(5); + + let mut policy = PostPolicy::new(&self.test_bucket, &expiration).unwrap(); + policy.add_equals_condition("key", &object_name).unwrap(); + policy + .add_content_length_range_condition(1 * 1024 * 1024, 4 * 1024 * 1024) + .unwrap(); + + let form_data = self + .client + .get_presigned_post_form_data(&policy) + .await + .unwrap(); + assert_eq!(form_data.contains_key("x-amz-signature"), true); + assert_eq!(form_data.contains_key("policy"), true); + } } #[tokio::main] @@ -628,9 +1193,6 @@ async fn s3_tests() -> Result<(), Box> { ); ctest.init().await; - println!("compose_object()"); - ctest.compose_object().await; - println!("make_bucket() + bucket_exists() + remove_bucket()"); ctest.bucket_exists().await; @@ -646,6 +1208,9 @@ async fn s3_tests() -> Result<(), Box> { println!("get_object()"); ctest.get_object().await; + println!("{{upload,download}}_object()"); + ctest.upload_download_object().await; + println!("remove_objects()"); ctest.remove_objects().await; @@ -661,6 +1226,36 @@ async fn s3_tests() -> Result<(), Box> { println!("copy_object()"); ctest.copy_object().await; + println!("compose_object()"); + ctest.compose_object().await; + + println!("{{set,get,delete}}_bucket_notification()"); + ctest.set_get_delete_bucket_notification().await; + + println!("{{set,get,delete}}_bucket_policy()"); + ctest.set_get_delete_bucket_policy().await; + + println!("{{set,get,delete}}_bucket_tags()"); + ctest.set_get_delete_bucket_tags().await; + + println!("{{set,get,delete}}_object_lock_config()"); + ctest.set_get_delete_object_lock_config().await; + + println!("{{set,get,delete}}_object_tags()"); + ctest.set_get_delete_object_tags().await; + + println!("{{set,get}}_bucket_versioning()"); + ctest.set_get_bucket_versioning().await; + + println!("{{set,get}}_object_retention()"); + ctest.set_get_object_retention().await; + + println!("get_presigned_object_url()"); + ctest.get_presigned_object_url().await; + + println!("get_presigned_post_form_data()"); + ctest.get_presigned_post_form_data().await; + ctest.drop().await; Ok(())