Add builder style remove_object(s) APIs (#85)

- remove older APIs
This commit is contained in:
Aditya Manthramurthy 2024-04-12 18:21:32 -07:00 committed by GitHub
parent 773ad9133f
commit d5a648f03c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 745 additions and 352 deletions

View File

@ -19,14 +19,14 @@ use crate::s3::error::Error;
use crate::s3::signer::post_presign_v4;
use crate::s3::sse::{Sse, SseCustomerKey};
use crate::s3::types::{
DeleteObject, Directive, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part,
ReplicationConfig, Retention, RetentionMode, SelectRequest, SseConfig,
Directive, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part, ReplicationConfig,
Retention, RetentionMode, SelectRequest, SseConfig,
};
use crate::s3::utils::{
b64encode, check_bucket_name, merge, to_amz_date, to_http_header_value, to_iso8601utc,
to_signer_date, urlencode, utc_now, Multimap, UtcTime,
};
use derivative::Derivative;
use hyper::http::Method;
use serde_json::json;
use serde_json::Value;
@ -881,90 +881,6 @@ pub type StatObjectArgs<'a> = ObjectConditionalReadArgs<'a>;
/// Source object information for [copy object argument](CopyObjectArgs)
pub type CopySource<'a> = ObjectConditionalReadArgs<'a>;
#[derive(Derivative, Clone, Debug, Default)]
/// Argument for [remove_objects_api()](crate::s3::client::Client::remove_objects_api) S3 API
pub struct RemoveObjectsApiArgs<'a> {
pub extra_headers: Option<&'a Multimap>,
pub extra_query_params: Option<&'a Multimap>,
pub region: Option<&'a str>,
pub bucket: &'a str,
pub bypass_governance_mode: bool,
#[derivative(Default(value = "true"))]
pub quiet: bool,
pub objects: &'a [DeleteObject<'a>],
}
impl<'a> RemoveObjectsApiArgs<'a> {
/// Returns argument for [remove_objects_api()](crate::s3::client::Client::remove_objects_api) S3 API with given bucket name and list of delete object information
///
/// # Examples
///
/// ```
/// use minio::s3::args::*;
/// use minio::s3::types::DeleteObject;
/// let mut objects: Vec<DeleteObject> = Vec::new();
/// objects.push(DeleteObject{name: "my-object-1", version_id: None});
/// objects.push(DeleteObject{name: "my-object-2", version_id: Some("0e295d23-10e1-4c39-b134-5b08ad146df6")});
/// let args = RemoveObjectsApiArgs::new("my-bucket", &objects).unwrap();
/// ```
pub fn new(
bucket_name: &'a str,
objects: &'a [DeleteObject],
) -> Result<RemoveObjectsApiArgs<'a>, Error> {
check_bucket_name(bucket_name, true)?;
Ok(RemoveObjectsApiArgs {
extra_headers: None,
extra_query_params: None,
region: None,
bucket: bucket_name,
bypass_governance_mode: false,
quiet: true,
objects,
})
}
}
/// Argument for [remove_objects()](crate::s3::client::Client::remove_objects) API
pub struct RemoveObjectsArgs<'a> {
pub extra_headers: Option<&'a Multimap>,
pub extra_query_params: Option<&'a Multimap>,
pub region: Option<&'a str>,
pub bucket: &'a str,
pub bypass_governance_mode: bool,
pub objects: &'a mut core::slice::Iter<'a, DeleteObject<'a>>,
}
impl<'a> RemoveObjectsArgs<'a> {
/// Returns argument for [remove_objects()](crate::s3::client::Client::remove_objects) API with given bucket name and iterable delete object information
///
/// # Examples
///
/// ```
/// use minio::s3::args::*;
/// use minio::s3::types::DeleteObject;
/// let mut objects: Vec<DeleteObject> = Vec::new();
/// objects.push(DeleteObject{name: "my-object-1", version_id: None});
/// objects.push(DeleteObject{name: "my-object-2", version_id: Some("0e295d23-10e1-4c39-b134-5b08ad146df6")});
/// let args = RemoveObjectsArgs::new("my-bucket", &mut objects.iter()).unwrap();
/// ```
pub fn new(
bucket_name: &'a str,
objects: &'a mut core::slice::Iter<'a, DeleteObject<'a>>,
) -> Result<RemoveObjectsArgs<'a>, Error> {
check_bucket_name(bucket_name, true)?;
Ok(RemoveObjectsArgs {
extra_headers: None,
extra_query_params: None,
region: None,
bucket: bucket_name,
bypass_governance_mode: false,
objects,
})
}
}
/// Argument for [select_object_content()](crate::s3::client::Client::select_object_content) API
pub struct SelectObjectContentArgs<'a> {
pub extra_headers: Option<&'a Multimap>,

View File

@ -18,6 +18,7 @@ mod list_objects;
mod listen_bucket_notification;
mod object_content;
mod put_object;
mod remove_objects;
pub use buckets::*;
pub use get_object::*;
@ -25,3 +26,4 @@ pub use list_objects::*;
pub use listen_bucket_notification::*;
pub use object_content::*;
pub use put_object::*;
pub use remove_objects::*;

View File

@ -0,0 +1,419 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2022-2024 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Builders for RemoveObject APIs.
use std::pin::Pin;
use async_trait::async_trait;
use bytes::Bytes;
use futures_util::{stream as futures_stream, Stream, StreamExt};
use http::Method;
use tokio_stream::iter as stream_iter;
use crate::s3::{
client_core::ClientCore,
error::Error,
response::{RemoveObjectResponse2, RemoveObjectsResponse},
types::{S3Api, S3Request, ToS3Request, ToStream},
utils::{check_bucket_name, md5sum_hash, merge, Multimap},
Client,
};
/// Specify an object to be deleted. The object can be specified by key or by
/// key and version_id via the From trait.
#[derive(Debug, Clone)]
pub struct ObjectToDelete {
key: String,
version_id: Option<String>,
}
/// A key can be converted into a DeleteObject. The version_id is set to None.
impl From<&str> for ObjectToDelete {
fn from(key: &str) -> Self {
ObjectToDelete {
key: key.to_string(),
version_id: None,
}
}
}
/// A tuple of key and version_id can be converted into a DeleteObject.
impl From<(&str, &str)> for ObjectToDelete {
fn from((key, version_id): (&str, &str)) -> Self {
ObjectToDelete {
key: key.to_string(),
version_id: Some(version_id.to_string()),
}
}
}
/// A tuple of key and option version_id can be converted into a DeleteObject.
impl From<(&str, Option<&str>)> for ObjectToDelete {
fn from((key, version_id): (&str, Option<&str>)) -> Self {
ObjectToDelete {
key: key.to_string(),
version_id: version_id.map(|v| v.to_string()),
}
}
}
#[derive(Debug, Clone)]
pub struct RemoveObject {
client: Option<Client>,
bucket: String,
object: ObjectToDelete,
bypass_governance_mode: bool,
extra_headers: Option<Multimap>,
extra_query_params: Option<Multimap>,
region: Option<String>,
}
impl RemoveObject {
pub fn new(bucket: &str, object: impl Into<ObjectToDelete>) -> Self {
Self {
client: None,
bucket: bucket.to_string(),
object: object.into(),
bypass_governance_mode: false,
extra_headers: None,
extra_query_params: None,
region: None,
}
}
pub fn client(mut self, client: &Client) -> Self {
self.client = Some(client.clone());
self
}
pub fn bypass_governance_mode(mut self, bypass_governance_mode: bool) -> Self {
self.bypass_governance_mode = bypass_governance_mode;
self
}
pub fn extra_headers(mut self, extra_headers: Option<Multimap>) -> Self {
self.extra_headers = extra_headers;
self
}
pub fn extra_query_params(mut self, extra_query_params: Option<Multimap>) -> Self {
self.extra_query_params = extra_query_params;
self
}
pub fn region(mut self, region: Option<String>) -> Self {
self.region = region;
self
}
}
impl S3Api for RemoveObject {
type S3Response = RemoveObjectResponse2;
}
impl ToS3Request for RemoveObject {
fn to_s3request(&self) -> Result<S3Request, Error> {
check_bucket_name(&self.bucket, true)?;
let mut headers = Multimap::new();
if let Some(v) = &self.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &self.extra_query_params {
merge(&mut query_params, v);
}
if let Some(v) = &self.object.version_id {
query_params.insert(String::from("versionId"), v.to_string());
}
let req = S3Request::new(
self.client.as_ref().ok_or(Error::NoClientProvided)?,
Method::DELETE,
)
.region(self.region.as_deref())
.bucket(Some(&self.bucket))
.object(Some(&self.object.key))
.query_params(query_params)
.headers(headers);
Ok(req)
}
}
#[derive(Debug, Clone)]
pub struct RemoveObjectsApi {
client: Option<ClientCore>,
bucket: String,
objects: Vec<ObjectToDelete>,
bypass_governance_mode: bool,
verbose_mode: bool,
extra_headers: Option<Multimap>,
extra_query_params: Option<Multimap>,
region: Option<String>,
}
impl RemoveObjectsApi {
pub fn new(bucket: &str, objects: Vec<ObjectToDelete>) -> Self {
RemoveObjectsApi {
client: None,
bucket: bucket.to_string(),
objects,
bypass_governance_mode: false,
verbose_mode: false,
extra_headers: None,
extra_query_params: None,
region: None,
}
}
pub fn client(mut self, client: &ClientCore) -> Self {
self.client = Some(client.clone());
self
}
pub fn bypass_governance_mode(mut self, bypass_governance_mode: bool) -> Self {
self.bypass_governance_mode = bypass_governance_mode;
self
}
/// Enable verbose mode (defaults to false). If enabled, the response will
/// include the keys of objects that were successfully deleted. Otherwise
/// only objects that encountered an error are returned.
pub fn verbose_mode(mut self, verbose_mode: bool) -> Self {
self.verbose_mode = verbose_mode;
self
}
pub fn extra_headers(mut self, extra_headers: Option<Multimap>) -> Self {
self.extra_headers = extra_headers;
self
}
pub fn extra_query_params(mut self, extra_query_params: Option<Multimap>) -> Self {
self.extra_query_params = extra_query_params;
self
}
pub fn region(mut self, region: Option<String>) -> Self {
self.region = region;
self
}
}
impl ToS3Request for RemoveObjectsApi {
fn to_s3request(&self) -> Result<S3Request, Error> {
check_bucket_name(&self.bucket, true)?;
let mut data = String::from("<Delete>");
if !self.verbose_mode {
data.push_str("<Quiet>true</Quiet>");
}
for object in self.objects.iter() {
data.push_str("<Object>");
data.push_str("<Key>");
data.push_str(&object.key);
data.push_str("</Key>");
if let Some(v) = object.version_id.as_ref() {
data.push_str("<VersionId>");
data.push_str(&v);
data.push_str("</VersionId>");
}
data.push_str("</Object>");
}
data.push_str("</Delete>");
let data: Bytes = data.into();
let mut headers = Multimap::new();
if let Some(v) = &self.extra_headers {
merge(&mut headers, v);
}
if self.bypass_governance_mode {
headers.insert(
String::from("x-amz-bypass-governance-retention"),
String::from("true"),
);
}
headers.insert(
String::from("Content-Type"),
String::from("application/xml"),
);
headers.insert(String::from("Content-MD5"), md5sum_hash(data.as_ref()));
let mut query_params = Multimap::new();
if let Some(v) = &self.extra_query_params {
merge(&mut query_params, v);
}
query_params.insert(String::from("delete"), String::new());
let client = self.client.as_ref().ok_or(Error::NoClientProvided)?.inner();
let req = S3Request::new(client, Method::POST)
.region(self.region.as_deref())
.bucket(Some(&self.bucket))
.query_params(query_params)
.headers(headers)
.body(Some(data.into()));
Ok(req)
}
}
impl S3Api for RemoveObjectsApi {
type S3Response = RemoveObjectsResponse;
}
pub struct DeleteObjects {
items: Pin<Box<dyn Stream<Item = ObjectToDelete> + Send + Sync>>,
}
impl DeleteObjects {
pub fn from_stream(s: impl Stream<Item = ObjectToDelete> + Send + Sync + 'static) -> Self {
DeleteObjects { items: Box::pin(s) }
}
}
impl From<ObjectToDelete> for DeleteObjects {
fn from(delete_object: ObjectToDelete) -> Self {
DeleteObjects::from_stream(stream_iter(std::iter::once(delete_object)))
}
}
impl<I: Iterator<Item = ObjectToDelete> + Send + Sync + 'static> From<I> for DeleteObjects {
fn from(keys: I) -> Self {
DeleteObjects::from_stream(stream_iter(keys))
}
}
pub struct RemoveObjects {
client: Option<Client>,
bucket: String,
objects: DeleteObjects,
bypass_governance_mode: bool,
verbose_mode: bool,
extra_headers: Option<Multimap>,
extra_query_params: Option<Multimap>,
region: Option<String>,
}
impl RemoveObjects {
pub fn new(bucket: &str, objects: impl Into<DeleteObjects>) -> Self {
RemoveObjects {
client: None,
bucket: bucket.to_string(),
objects: objects.into(),
bypass_governance_mode: false,
verbose_mode: false,
extra_headers: None,
extra_query_params: None,
region: None,
}
}
pub fn client(mut self, client: &Client) -> Self {
self.client = Some(client.clone());
self
}
pub fn bypass_governance_mode(mut self, bypass_governance_mode: bool) -> Self {
self.bypass_governance_mode = bypass_governance_mode;
self
}
/// Enable verbose mode (defaults to false). If enabled, the response will
/// include the keys of objects that were successfully deleted. Otherwise
/// only objects that encountered an error are returned.
pub fn verbose_mode(mut self, verbose_mode: bool) -> Self {
self.verbose_mode = verbose_mode;
self
}
pub fn extra_headers(mut self, extra_headers: Option<Multimap>) -> Self {
self.extra_headers = extra_headers;
self
}
pub fn extra_query_params(mut self, extra_query_params: Option<Multimap>) -> Self {
self.extra_query_params = extra_query_params;
self
}
pub fn region(mut self, region: Option<String>) -> Self {
self.region = region;
self
}
async fn next_request(&mut self) -> Result<Option<RemoveObjectsApi>, Error> {
let mut objects = Vec::new();
while let Some(object) = self.objects.items.next().await {
objects.push(object);
if objects.len() >= 1000 {
break;
}
}
if objects.is_empty() {
return Ok(None);
}
let client_core = ClientCore::new(self.client.as_ref().ok_or(Error::NoClientProvided)?);
let request = RemoveObjectsApi::new(&self.bucket, objects)
.client(&client_core)
.bypass_governance_mode(self.bypass_governance_mode)
.verbose_mode(self.verbose_mode)
.extra_headers(self.extra_headers.clone())
.extra_query_params(self.extra_query_params.clone())
.region(self.region.clone());
Ok(Some(request))
}
}
#[async_trait]
impl ToStream for RemoveObjects {
type Item = RemoveObjectsResponse;
async fn to_stream(
mut self,
) -> Box<dyn Stream<Item = Result<Self::Item, Error>> + Unpin + Send> {
Box::new(Box::pin(futures_stream::unfold(
self,
move |mut this| async move {
match this.next_request().await {
Ok(Some(request)) => {
let response = request.send().await;
Some((response, this))
}
Ok(None) => None,
Err(e) => Some((Err(e), this)),
}
},
)))
}
}

View File

@ -29,8 +29,8 @@ use crate::s3::response::*;
use crate::s3::signer::{presign_v4, sign_v4_s3};
use crate::s3::sse::SseCustomerKey;
use crate::s3::types::{
DeleteObject, Directive, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part,
ReplicationConfig, RetentionMode, SseConfig,
Directive, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part, ReplicationConfig,
RetentionMode, SseConfig,
};
use crate::s3::utils::{
from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, md5sum_hash_sb,
@ -51,6 +51,7 @@ mod get_object;
mod list_objects;
mod listen_bucket_notification;
mod put_object;
mod remove_objects;
use super::builders::{GetBucketVersioning, ListBuckets, SegmentedBytes};
@ -2566,175 +2567,6 @@ impl Client {
})
}
pub async fn remove_object(
&self,
args: &RemoveObjectArgs<'_>,
) -> Result<RemoveObjectResponse, Error> {
let region = self.get_region(args.bucket, args.region).await?;
let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
if let Some(v) = args.version_id {
query_params.insert(String::from("versionId"), v.to_string());
}
let resp = self
.execute(
Method::DELETE,
&region,
&mut headers,
&query_params,
Some(args.bucket),
Some(args.object),
None,
)
.await?;
Ok(RemoveObjectResponse {
headers: resp.headers().clone(),
region: region.to_string(),
bucket_name: args.bucket.to_string(),
object_name: args.object.to_string(),
version_id: args.version_id.map(|v| v.to_string()),
})
}
/// Executes [DeleteObjects](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html) S3 API
pub async fn remove_objects_api(
&self,
args: &RemoveObjectsApiArgs<'_>,
) -> Result<RemoveObjectsApiResponse, Error> {
let region = self.get_region(args.bucket, args.region).await?;
let mut data = String::from("<Delete>");
if args.quiet {
data.push_str("<Quiet>true</Quiet>");
}
for object in args.objects.iter() {
data.push_str("<Object>");
data.push_str("<Key>");
data.push_str(object.name);
data.push_str("</Key>");
if let Some(v) = object.version_id {
data.push_str("<VersionId>");
data.push_str(v);
data.push_str("</VersionId>");
}
data.push_str("</Object>");
}
data.push_str("</Delete>");
let data: Bytes = data.into();
let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}
if args.bypass_governance_mode {
headers.insert(
String::from("x-amz-bypass-governance-retention"),
String::from("true"),
);
}
headers.insert(
String::from("Content-Type"),
String::from("application/xml"),
);
headers.insert(String::from("Content-MD5"), md5sum_hash(data.as_ref()));
let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
query_params.insert(String::from("delete"), String::new());
let resp = self
.execute(
Method::POST,
&region,
&mut headers,
&query_params,
Some(args.bucket),
None,
Some(data),
)
.await?;
let header_map = resp.headers().clone();
let body = resp.bytes().await?;
let mut root = Element::parse(body.reader())?;
let mut objects: Vec<DeletedObject> = Vec::new();
while let Some(v) = root.take_child("Deleted") {
let deleted = v;
objects.push(DeletedObject {
name: get_text(&deleted, "Key")?,
version_id: get_option_text(&deleted, "VersionId"),
delete_marker: get_text(&deleted, "DeleteMarker")?.to_lowercase() == "true",
delete_marker_version_id: get_option_text(&deleted, "DeleteMarkerVersionId"),
})
}
let mut errors: Vec<DeleteError> = Vec::new();
while let Some(v) = root.take_child("Error") {
let error = v;
errors.push(DeleteError {
code: get_text(&error, "Code")?,
message: get_text(&error, "Message")?,
object_name: get_text(&error, "Key")?,
version_id: get_option_text(&error, "VersionId"),
})
}
Ok(RemoveObjectsApiResponse {
headers: header_map.clone(),
region: region.clone(),
bucket_name: args.bucket.to_string(),
objects,
errors,
})
}
pub async fn remove_objects(
&self,
args: &mut RemoveObjectsArgs<'_>,
) -> Result<RemoveObjectsResponse, Error> {
let region = self.get_region(args.bucket, args.region).await?;
loop {
let mut objects: Vec<DeleteObject> = Vec::new();
for object in args.objects.take(1000) {
objects.push(*object);
}
if objects.is_empty() {
break;
}
let mut roa_args = RemoveObjectsApiArgs::new(args.bucket, &objects)?;
roa_args.extra_headers = args.extra_headers;
roa_args.extra_query_params = args.extra_query_params;
roa_args.region = args.region;
roa_args.bypass_governance_mode = args.bypass_governance_mode;
roa_args.quiet = true;
let resp = self.remove_objects_api(&roa_args).await?;
if !resp.errors.is_empty() {
return Ok(resp);
}
}
Ok(RemoveObjectsResponse {
headers: HeaderMap::new(),
region: region.to_string(),
bucket_name: args.bucket.to_string(),
objects: vec![],
errors: vec![],
})
}
pub async fn set_bucket_encryption(
&self,
args: &SetBucketEncryptionArgs<'_>,

View File

@ -0,0 +1,31 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2022-2024 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! APIs to remove objects.
use crate::s3::{
builders::{DeleteObjects, ObjectToDelete, RemoveObject, RemoveObjects},
client::Client,
};
impl Client {
pub fn remove_object(&self, bucket: &str, object: impl Into<ObjectToDelete>) -> RemoveObject {
RemoveObject::new(bucket, object).client(self)
}
pub fn remove_objects(&self, bucket: &str, object: impl Into<DeleteObjects>) -> RemoveObjects {
RemoveObjects::new(bucket, object).client(self)
}
}

41
src/s3/client_core.rs Normal file
View File

@ -0,0 +1,41 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2022-2024 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Module containing lower level APIs.
use super::{
builders::{ObjectToDelete, RemoveObjectsApi},
Client,
};
#[derive(Debug, Clone)]
pub struct ClientCore(Client);
impl ClientCore {
pub fn new(client: &Client) -> Self {
Self(client.clone())
}
pub(crate) fn inner(&self) -> &Client {
&self.0
}
/// Creates a builder to execute
/// [DeleteObjects](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html)
/// S3 API
pub fn delete_objects(&self, bucket: &str, object: Vec<ObjectToDelete>) -> RemoveObjectsApi {
RemoveObjectsApi::new(bucket, object).client(self)
}
}

View File

@ -18,6 +18,7 @@
pub mod args;
pub mod builders;
pub mod client;
pub mod client_core;
pub mod creds;
pub mod error;
pub mod http;

View File

@ -36,6 +36,7 @@ mod get_object;
mod list_objects;
mod listen_bucket_notification;
mod put_object;
mod remove_objects;
pub use buckets::{GetBucketVersioningResponse, ListBucketsResponse};
pub use get_object::GetObjectResponse2;
@ -48,6 +49,9 @@ pub use put_object::{
CreateMultipartUploadResponse2, PutObjectContentResponse, PutObjectResponse,
UploadPartResponse2,
};
pub use remove_objects::{
DeleteError, DeletedObject, RemoveObjectResponse2, RemoveObjectsResponse,
};
#[derive(Debug)]
/// Base response for bucket operation
@ -217,37 +221,6 @@ impl StatObjectResponse {
}
}
#[derive(Clone, Debug)]
/// Error defintion of [remove_objects_api()](crate::s3::client::Client::remove_objects_api) S3 API
pub struct DeleteError {
pub code: String,
pub message: String,
pub object_name: String,
pub version_id: Option<String>,
}
#[derive(Clone, Debug)]
/// Deleted object defintion of [remove_objects_api()](crate::s3::client::Client::remove_objects_api) S3 API
pub struct DeletedObject {
pub name: String,
pub version_id: Option<String>,
pub delete_marker: bool,
pub delete_marker_version_id: Option<String>,
}
#[derive(Clone, Debug)]
/// Response of [remove_objects_api()](crate::s3::client::Client::remove_objects_api) S3 API
pub struct RemoveObjectsApiResponse {
pub headers: HeaderMap,
pub region: String,
pub bucket_name: String,
pub objects: Vec<DeletedObject>,
pub errors: Vec<DeleteError>,
}
/// Response of [remove_objects()](crate::s3::client::Client::remove_objects) API
pub type RemoveObjectsResponse = RemoveObjectsApiResponse;
/// Response of [select_object_content()](crate::s3::client::Client::select_object_content) API
pub struct SelectObjectContentResponse {
pub headers: HeaderMap,

View File

@ -0,0 +1,146 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2022-2024 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Responses for RemoveObject APIs.
use async_trait::async_trait;
use bytes::Buf;
use http::HeaderMap;
use xmltree::Element;
use crate::s3::{
error::Error,
types::{FromS3Response, S3Request},
utils::{get_default_text, get_option_text, get_text},
};
#[derive(Debug, Clone)]
pub struct RemoveObjectResponse2 {
pub headers: HeaderMap,
/// Value of the `x-amz-delete-marker` header.
pub is_delete_marker: bool,
/// If a delete marker was created, this field will contain the version_id
/// of the delete marker. Value of the `x-amz-version-id` header.
pub version_id: Option<String>,
}
#[async_trait]
impl FromS3Response for RemoveObjectResponse2 {
async fn from_s3response<'a>(
_req: S3Request<'a>,
resp: reqwest::Response,
) -> Result<Self, Error> {
let headers = resp.headers().clone();
let is_delete_marker = headers
.get("x-amz-delete-marker")
.map(|v| v == "true")
.unwrap_or(false);
let version_id = headers
.get("x-amz-version-id")
.map(|v| v.to_str().unwrap().to_string());
Ok(RemoveObjectResponse2 {
headers,
is_delete_marker,
version_id,
})
}
}
/// Error info returned by the S3 API when an object could not be deleted.
#[derive(Clone, Debug)]
pub struct DeleteError {
pub code: String,
pub message: String,
pub object_name: String,
pub version_id: Option<String>,
}
/// Information about an object that was deleted.
#[derive(Clone, Debug)]
pub struct DeletedObject {
pub name: String,
pub version_id: Option<String>,
pub delete_marker: bool,
pub delete_marker_version_id: Option<String>,
}
/// Response of
/// [remove_objects_api()](crate::s3::client_core::ClientCore::delete_objects)
/// S3 API. It is also returned by the
/// [remove_objects()](crate::s3::client::Client::remove_objects) API in the
/// form of a stream.
#[derive(Clone, Debug)]
pub struct RemoveObjectsResponse {
pub headers: HeaderMap,
pub result: Vec<DeleteResult>,
}
#[derive(Clone, Debug)]
pub enum DeleteResult {
Deleted(DeletedObject),
Error(DeleteError),
}
impl DeleteResult {
pub fn is_deleted(&self) -> bool {
matches!(self, DeleteResult::Deleted(_))
}
pub fn is_error(&self) -> bool {
matches!(self, DeleteResult::Error(_))
}
}
#[async_trait]
impl FromS3Response for RemoveObjectsResponse {
async fn from_s3response<'a>(
_req: S3Request<'a>,
resp: reqwest::Response,
) -> Result<Self, Error> {
let headers = resp.headers().clone();
let body = resp.bytes().await?;
let root = Element::parse(body.reader())?;
let result = root
.children
.iter()
.map(|elem| elem.as_element().unwrap())
.map(|elem| {
if elem.name == "Deleted" {
Ok(DeleteResult::Deleted(DeletedObject {
name: get_text(&elem, "Key")?,
version_id: get_option_text(&elem, "VersionId"),
delete_marker: get_default_text(&elem, "DeleteMarker").to_lowercase()
== "true",
delete_marker_version_id: get_option_text(&elem, "DeleteMarkerVersionId"),
}))
} else {
assert_eq!(elem.name, "Error");
Ok(DeleteResult::Error(DeleteError {
code: get_text(&elem, "Code")?,
message: get_text(&elem, "Message")?,
object_name: get_text(&elem, "Key")?,
version_id: get_option_text(&elem, "VersionId"),
}))
}
})
.collect::<Result<Vec<DeleteResult>, Error>>()?;
Ok(RemoveObjectsResponse { headers, result })
}
}

View File

@ -230,13 +230,6 @@ pub fn parse_legal_hold(s: &str) -> Result<bool, Error> {
}
}
#[derive(Clone, Debug, Copy)]
/// Contains delete object name and optional version ID
pub struct DeleteObject<'a> {
pub name: &'a str,
pub version_id: Option<&'a str>,
}
#[derive(Clone, Debug)]
/// Compression types
pub enum CompressionType {

View File

@ -19,7 +19,7 @@ use chrono::Duration;
use futures_util::Stream;
use hyper::http::Method;
use minio::s3::builders::ObjectContent;
use minio::s3::builders::{ObjectContent, ObjectToDelete};
use rand::{
distributions::{Alphanumeric, DistString},
rngs::SmallRng,
@ -36,12 +36,13 @@ use tokio_stream::StreamExt;
use minio::s3::args::*;
use minio::s3::client::Client;
use minio::s3::creds::StaticProvider;
use minio::s3::error::Error;
use minio::s3::http::BaseUrl;
use minio::s3::types::ToStream;
use minio::s3::types::{
CsvInputSerialization, CsvOutputSerialization, DeleteObject, FileHeaderInfo,
NotificationConfig, ObjectLockConfig, PrefixFilterRule, QueueConfig, QuoteFields,
RetentionMode, SelectRequest, SuffixFilterRule,
CsvInputSerialization, CsvOutputSerialization, FileHeaderInfo, NotificationConfig,
ObjectLockConfig, PrefixFilterRule, QueueConfig, QuoteFields, RetentionMode, SelectRequest,
SuffixFilterRule,
};
use minio::s3::types::{NotificationRecords, S3Api};
use minio::s3::utils::{to_iso8601utc, utc_now};
@ -277,9 +278,21 @@ impl ClientTest {
assert_eq!(resp.object_name, object_name);
assert_eq!(resp.size, size);
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.remove_object(&self.test_bucket, object_name.as_str())
.send()
.await
.unwrap();
// Validate delete succeeded.
let resp = self
.client
.stat_object(&StatObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.await;
match resp.err().unwrap() {
Error::S3Error(er) => {
assert_eq!(er.code, "NoSuchKey")
}
_ => assert!(false),
}
}
async fn put_object_multipart(&self) {
@ -307,7 +320,8 @@ impl ClientTest {
assert_eq!(resp.object_name, object_name);
assert_eq!(resp.size, size);
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.remove_object(&self.test_bucket, object_name.as_str())
.send()
.await
.unwrap();
}
@ -338,7 +352,8 @@ impl ClientTest {
assert_eq!(resp.size, *size as usize);
assert_eq!(resp.etag, etag);
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.remove_object(&self.test_bucket, object_name.as_str())
.send()
.await
.unwrap();
}
@ -367,7 +382,8 @@ impl ClientTest {
assert_eq!(resp.size, *size as usize);
assert_eq!(resp.etag, etag);
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.remove_object(&self.test_bucket, object_name.as_str())
.send()
.await
.unwrap();
}
@ -416,7 +432,8 @@ impl ClientTest {
assert_eq!(resp.size, sizes[idx] as usize);
assert_eq!(resp.etag, etag);
client
.remove_object(&RemoveObjectArgs::new(&test_bucket, &object_name).unwrap())
.remove_object(&test_bucket, object_name.as_str())
.send()
.await
.unwrap();
@ -453,7 +470,8 @@ impl ClientTest {
let got = resp.text().await.unwrap();
assert_eq!(got, data);
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.remove_object(&self.test_bucket, object_name.as_str())
.send()
.await
.unwrap();
}
@ -475,7 +493,8 @@ impl ClientTest {
let got = resp.content.to_segmented_bytes().await.unwrap().to_bytes();
assert_eq!(got, data);
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.remove_object(&self.test_bucket, object_name.as_str())
.send()
.await
.unwrap();
}
@ -513,12 +532,13 @@ impl ClientTest {
fs::remove_file(&filename).unwrap();
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.remove_object(&self.test_bucket, object_name.as_str())
.send()
.await
.unwrap();
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.remove_object(&self.test_bucket, object_name.as_str())
.send()
.await
.unwrap();
@ -547,12 +567,13 @@ impl ClientTest {
fs::remove_file(&filename).unwrap();
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.remove_object(&self.test_bucket, object_name.as_str())
.send()
.await
.unwrap();
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.remove_object(&self.test_bucket, object_name.as_str())
.send()
.await
.unwrap();
}
@ -583,20 +604,27 @@ impl ClientTest {
.unwrap();
names.push(object_name);
}
let mut objects: Vec<DeleteObject> = Vec::new();
for name in names.iter() {
objects.push(DeleteObject {
name,
version_id: None,
});
}
let del_items: Vec<ObjectToDelete> = names
.iter()
.map(|v| ObjectToDelete::from(v.as_str()))
.collect();
self.client
.remove_objects(
&mut RemoveObjectsArgs::new(&self.test_bucket, &mut objects.iter()).unwrap(),
)
.await
.unwrap();
let mut resp = self
.client
.remove_objects(&self.test_bucket, del_items.into_iter())
.verbose_mode(true)
.to_stream()
.await;
let mut del_count = 0;
while let Some(item) = resp.next().await {
let res = item.unwrap();
for obj in res.result.iter() {
assert!(obj.is_deleted());
}
del_count += res.result.len();
}
assert_eq!(del_count, 3);
self.client
.remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap())
@ -647,21 +675,23 @@ impl ClientTest {
}
assert!(count == 3);
let mut objects: Vec<DeleteObject> = Vec::new();
for name in names.iter() {
objects.push(DeleteObject {
name,
version_id: None,
});
let del_items: Vec<ObjectToDelete> = names
.iter()
.map(|v| ObjectToDelete::from(v.as_str()))
.collect();
let mut resp = self
.client
.remove_objects(&self.test_bucket, del_items.into_iter())
.verbose_mode(true)
.to_stream()
.await;
while let Some(item) = resp.next().await {
let res = item.unwrap();
for obj in res.result.iter() {
assert!(obj.is_deleted());
}
}
self.client
.remove_objects(
&mut RemoveObjectsArgs::new(&self.test_bucket, &mut objects.iter()).unwrap(),
)
.await
.unwrap();
self.client
.remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap())
.await
@ -731,7 +761,8 @@ impl ClientTest {
}
assert_eq!(got, data);
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.remove_object(&self.test_bucket, object_name.as_str())
.send()
.await
.unwrap();
}
@ -806,7 +837,8 @@ impl ClientTest {
.unwrap();
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.remove_object(&self.test_bucket, object_name.as_str())
.send()
.await
.unwrap();
@ -853,12 +885,13 @@ impl ClientTest {
assert_eq!(resp.size, size);
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.remove_object(&self.test_bucket, object_name.as_str())
.send()
.await
.unwrap();
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &src_object_name).unwrap())
.remove_object(&self.test_bucket, src_object_name.as_str())
.send()
.await
.unwrap();
}
@ -904,12 +937,13 @@ impl ClientTest {
assert_eq!(resp.size, 5);
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.remove_object(&self.test_bucket, object_name.as_str())
.send()
.await
.unwrap();
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &src_object_name).unwrap())
.remove_object(&self.test_bucket, src_object_name.as_str())
.send()
.await
.unwrap();
}
@ -1204,7 +1238,8 @@ impl ClientTest {
assert!(resp.tags.is_empty());
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.remove_object(&self.test_bucket, object_name.as_str())
.send()
.await
.unwrap();
}
@ -1319,10 +1354,14 @@ impl ClientTest {
assert!(resp.retention_mode.is_none());
assert!(resp.retain_until_date.is_none());
let mut args = RemoveObjectArgs::new(&bucket_name, &object_name).unwrap();
let version_id = obj_resp.version_id.unwrap().clone();
args.version_id = Some(version_id.as_str());
self.client.remove_object(&args).await.unwrap();
self.client
.remove_object(
&bucket_name,
(object_name.as_str(), obj_resp.version_id.as_deref()),
)
.send()
.await
.unwrap();
self.client
.remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap())