Add builder style for list_objects (#74)

- Also add `include_user_metadata` option for list object versions
This commit is contained in:
Aditya Manthramurthy 2024-04-02 17:39:40 -07:00 committed by GitHub
parent 75ea23aaf1
commit fc20535f1d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1450 additions and 907 deletions

View File

@ -12,6 +12,7 @@ categories = ["api-bindings", "web-programming::http-client"]
[dependencies] [dependencies]
async-recursion = "1.0.4" async-recursion = "1.0.4"
async-trait = "0.1.73"
base64 = "0.21.3" base64 = "0.21.3"
byteorder = "1.4.3" byteorder = "1.4.3"
bytes = "1.4.0" bytes = "1.4.0"

View File

@ -986,271 +986,6 @@ impl<'a> RemoveObjectsArgs<'a> {
} }
} }
/// Argument for [list_objects_v1()](crate::s3::client::Client::list_objects_v1) S3 API
#[derive(Clone, Debug)]
pub struct ListObjectsV1Args {
pub extra_headers: Option<Multimap>,
pub extra_query_params: Option<Multimap>,
pub region: Option<String>,
pub bucket: String,
pub delimiter: Option<String>,
pub use_url_encoding_type: bool,
pub max_keys: Option<u16>,
pub prefix: Option<String>,
pub marker: Option<String>,
}
// Helper function delimiter based on recursive flag when delimiter is not
// provided.
fn delim_helper(delim: Option<String>, recursive: bool) -> Option<String> {
if delim.is_some() {
return delim;
}
match recursive {
true => None,
false => Some(String::from("/")),
}
}
impl From<ListObjectsArgs> for ListObjectsV1Args {
fn from(value: ListObjectsArgs) -> Self {
ListObjectsV1Args {
extra_headers: value.extra_headers,
extra_query_params: value.extra_query_params,
region: value.region,
bucket: value.bucket,
delimiter: delim_helper(value.delimiter, value.recursive),
use_url_encoding_type: value.use_url_encoding_type,
max_keys: value.max_keys,
prefix: value.prefix,
marker: value.marker,
}
}
}
impl ListObjectsV1Args {
/// Returns argument for [list_objects_v1()](crate::s3::client::Client::list_objects_v1) S3 API with given bucket name
///
/// # Examples
///
/// ```
/// use minio::s3::args::*;
/// let args = ListObjectsV1Args::new("my-bucket").unwrap();
/// ```
pub fn new(bucket_name: &str) -> Result<ListObjectsV1Args, Error> {
check_bucket_name(bucket_name, true)?;
Ok(ListObjectsV1Args {
extra_headers: None,
extra_query_params: None,
region: None,
bucket: bucket_name.to_owned(),
delimiter: None,
use_url_encoding_type: true,
max_keys: None,
prefix: None,
marker: None,
})
}
}
/// Argument for [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API
#[derive(Clone, Debug)]
pub struct ListObjectsV2Args {
pub extra_headers: Option<Multimap>,
pub extra_query_params: Option<Multimap>,
pub region: Option<String>,
pub bucket: String,
pub delimiter: Option<String>,
pub use_url_encoding_type: bool,
pub max_keys: Option<u16>,
pub prefix: Option<String>,
pub start_after: Option<String>,
pub continuation_token: Option<String>,
pub fetch_owner: bool,
pub include_user_metadata: bool,
}
impl From<ListObjectsArgs> for ListObjectsV2Args {
fn from(value: ListObjectsArgs) -> Self {
ListObjectsV2Args {
extra_headers: value.extra_headers,
extra_query_params: value.extra_query_params,
region: value.region,
bucket: value.bucket,
delimiter: delim_helper(value.delimiter, value.recursive),
use_url_encoding_type: value.use_url_encoding_type,
max_keys: value.max_keys,
prefix: value.prefix,
start_after: value.start_after,
continuation_token: value.continuation_token,
fetch_owner: value.fetch_owner,
include_user_metadata: value.include_user_metadata,
}
}
}
impl ListObjectsV2Args {
/// Returns argument for [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API with given bucket name
///
/// # Examples
///
/// ```
/// use minio::s3::args::*;
/// let args = ListObjectsV2Args::new("my-bucket").unwrap();
/// ```
pub fn new(bucket_name: &str) -> Result<ListObjectsV2Args, Error> {
check_bucket_name(bucket_name, true)?;
Ok(ListObjectsV2Args {
extra_headers: None,
extra_query_params: None,
region: None,
bucket: bucket_name.to_owned(),
delimiter: None,
use_url_encoding_type: true,
max_keys: None,
prefix: None,
start_after: None,
continuation_token: None,
fetch_owner: false,
include_user_metadata: false,
})
}
}
/// Argument for [list_object_versions()](crate::s3::client::Client::list_object_versions) S3 API
pub struct ListObjectVersionsArgs {
pub extra_headers: Option<Multimap>,
pub extra_query_params: Option<Multimap>,
pub region: Option<String>,
pub bucket: String,
pub delimiter: Option<String>,
pub use_url_encoding_type: bool,
pub max_keys: Option<u16>,
pub prefix: Option<String>,
pub key_marker: Option<String>,
pub version_id_marker: Option<String>,
}
impl From<ListObjectsArgs> for ListObjectVersionsArgs {
fn from(value: ListObjectsArgs) -> Self {
ListObjectVersionsArgs {
extra_headers: value.extra_headers,
extra_query_params: value.extra_query_params,
region: value.region,
bucket: value.bucket,
delimiter: delim_helper(value.delimiter, value.recursive),
use_url_encoding_type: value.use_url_encoding_type,
max_keys: value.max_keys,
prefix: value.prefix,
key_marker: value.key_marker,
version_id_marker: value.version_id_marker,
}
}
}
impl ListObjectVersionsArgs {
/// Returns argument for [list_object_versions()](crate::s3::client::Client::list_object_versions) S3 API with given bucket name
///
/// # Examples
///
/// ```
/// use minio::s3::args::*;
/// let args = ListObjectVersionsArgs::new("my-bucket").unwrap();
/// ```
pub fn new(bucket_name: &str) -> Result<ListObjectVersionsArgs, Error> {
check_bucket_name(bucket_name, true)?;
Ok(ListObjectVersionsArgs {
extra_headers: None,
extra_query_params: None,
region: None,
bucket: bucket_name.to_owned(),
delimiter: None,
use_url_encoding_type: true,
max_keys: None,
prefix: None,
key_marker: None,
version_id_marker: None,
})
}
}
/// Argument for [list_objects()](crate::s3::client::Client::list_objects) API
#[derive(Clone, Debug)]
pub struct ListObjectsArgs {
pub extra_headers: Option<Multimap>,
pub extra_query_params: Option<Multimap>,
pub region: Option<String>,
/// Specifies the bucket name on which listing is to be performed.
pub bucket: String,
/// Delimiter to roll up common prefixes on.
pub delimiter: Option<String>,
pub use_url_encoding_type: bool,
pub max_keys: Option<u16>,
pub prefix: Option<String>,
/// Used only with ListObjectsV1.
pub marker: Option<String>,
/// Used only with ListObjectsV2
pub start_after: Option<String>,
/// Used only with ListObjectsV2.
pub continuation_token: Option<String>,
/// Used only with ListObjectsV2.
pub fetch_owner: bool,
/// MinIO extension for ListObjectsV2.
pub include_user_metadata: bool,
/// Used only with GetObjectVersions.
pub key_marker: Option<String>,
/// Used only with GetObjectVersions.
pub version_id_marker: Option<String>,
/// This parameter takes effect only when delimiter is None. Enables
/// recursive traversal for listing of the bucket and prefix.
pub recursive: bool,
/// Set this to use ListObjectsV1. Defaults to false.
pub use_api_v1: bool,
/// Set this to include versions.
pub include_versions: bool,
}
impl ListObjectsArgs {
/// Returns argument for [list_objects()](crate::s3::client::Client::list_objects) API with given bucket name and callback function for results.
///
/// # Examples
///
/// ```
/// use minio::s3::args::*;
/// let args = ListObjectsArgs::new("my-bucket").unwrap();
/// ```
pub fn new(bucket_name: &str) -> Result<ListObjectsArgs, Error> {
check_bucket_name(bucket_name, true)?;
Ok(ListObjectsArgs {
extra_headers: None,
extra_query_params: None,
region: None,
bucket: bucket_name.to_owned(),
delimiter: None,
use_url_encoding_type: true,
marker: None,
start_after: None,
key_marker: None,
max_keys: None,
prefix: None,
continuation_token: None,
fetch_owner: false,
version_id_marker: None,
include_user_metadata: false,
recursive: false,
use_api_v1: false,
include_versions: false,
})
}
}
/// Argument for [select_object_content()](crate::s3::client::Client::select_object_content) API /// Argument for [select_object_content()](crate::s3::client::Client::select_object_content) API
pub struct SelectObjectContentArgs<'a> { pub struct SelectObjectContentArgs<'a> {
pub extra_headers: Option<&'a Multimap>, pub extra_headers: Option<&'a Multimap>,

17
src/s3/builders.rs Normal file
View File

@ -0,0 +1,17 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Argument builders for [minio::s3::client::Client](crate::s3::client::Client) APIs
mod list_objects;
pub use list_objects::*;

View File

@ -0,0 +1,746 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Argument builders for ListObject APIs.
use async_trait::async_trait;
use futures_util::{stream as futures_stream, Stream, StreamExt};
use http::Method;
use crate::s3::{
client::Client,
error::Error,
response::{
ListObjectVersionsResponse, ListObjectsResponse, ListObjectsV1Response,
ListObjectsV2Response,
},
types::{S3Api, S3Request, ToS3Request, ToStream},
utils::{check_bucket_name, merge, Multimap},
};
fn add_common_list_objects_query_params(
query_params: &mut Multimap,
delimiter: Option<&str>,
disable_url_encoding: bool,
max_keys: Option<u16>,
prefix: Option<&str>,
) {
query_params.insert(
String::from("delimiter"),
delimiter.unwrap_or("").to_string(),
);
query_params.insert(
String::from("max-keys"),
max_keys.unwrap_or(1000).to_string(),
);
query_params.insert(String::from("prefix"), prefix.unwrap_or("").to_string());
if !disable_url_encoding {
query_params.insert(String::from("encoding-type"), String::from("url"));
}
}
/// Argument builder for ListObjectsV1 S3 API, created by
/// [list_objects_v1()](crate::s3::client::Client::list_objects_v1).
#[derive(Clone, Debug, Default)]
pub struct ListObjectsV1 {
client: Option<Client>,
extra_headers: Option<Multimap>,
extra_query_params: Option<Multimap>,
region: Option<String>,
bucket: String,
delimiter: Option<String>,
disable_url_encoding: bool,
max_keys: Option<u16>,
prefix: Option<String>,
marker: Option<String>,
}
#[async_trait]
impl ToStream for ListObjectsV1 {
type Item = ListObjectsV1Response;
async fn to_stream(self) -> Box<dyn Stream<Item = Result<Self::Item, Error>> + Unpin + Send> {
Box::new(Box::pin(futures_stream::unfold(
(self.clone(), false),
move |(mut args, mut is_done)| async move {
if is_done {
return None;
}
let resp = args.send().await;
match resp {
Ok(resp) => {
args.marker = resp.next_marker.clone();
is_done = !resp.is_truncated;
Some((Ok(resp), (args, is_done)))
}
Err(e) => Some((Err(e), (args, true))),
}
},
)))
}
}
impl ToS3Request for ListObjectsV1 {
fn to_s3request<'a>(&'a self) -> Result<S3Request<'a>, Error> {
check_bucket_name(&self.bucket, true)?;
let mut headers = Multimap::new();
if let Some(v) = &self.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &self.extra_query_params {
merge(&mut query_params, v);
}
add_common_list_objects_query_params(
&mut query_params,
self.delimiter.as_deref(),
self.disable_url_encoding,
self.max_keys,
self.prefix.as_deref(),
);
if let Some(v) = &self.marker {
query_params.insert(String::from("marker"), v.to_string());
}
let req = S3Request::new(
self.client.as_ref().ok_or(Error::NoClientProvided)?,
Method::GET,
)
.region(self.region.as_deref())
.bucket(Some(&self.bucket))
.query_params(query_params)
.headers(headers);
Ok(req)
}
}
impl S3Api for ListObjectsV1 {
type S3Response = ListObjectsV1Response;
}
// Helper function delimiter based on recursive flag when delimiter is not
// provided.
fn delim_helper(delim: Option<String>, recursive: bool) -> Option<String> {
if delim.is_some() {
return delim;
}
match recursive {
true => None,
false => Some(String::from("/")),
}
}
impl From<ListObjects> for ListObjectsV1 {
fn from(value: ListObjects) -> Self {
ListObjectsV1 {
client: value.client,
extra_headers: value.extra_headers,
extra_query_params: value.extra_query_params,
region: value.region,
bucket: value.bucket,
delimiter: delim_helper(value.delimiter, value.recursive),
disable_url_encoding: value.disable_url_encoding,
max_keys: value.max_keys,
prefix: value.prefix,
marker: value.marker,
}
}
}
impl ListObjectsV1 {
pub fn new(bucket: &str) -> Self {
Self {
bucket: bucket.to_owned(),
..Default::default()
}
}
pub fn client(mut self, client: &Client) -> Self {
self.client = Some(client.clone());
self
}
pub fn extra_headers(mut self, extra_headers: Option<Multimap>) -> Self {
self.extra_headers = extra_headers;
self
}
pub fn extra_query_params(mut self, extra_query_params: Option<Multimap>) -> Self {
self.extra_query_params = extra_query_params;
self
}
pub fn region(mut self, region: Option<String>) -> Self {
self.region = region;
self
}
pub fn delimiter(mut self, delimiter: Option<String>) -> Self {
self.delimiter = delimiter;
self
}
pub fn disable_url_encoding(mut self, disable_url_encoding: bool) -> Self {
self.disable_url_encoding = disable_url_encoding;
self
}
pub fn max_keys(mut self, max_keys: Option<u16>) -> Self {
self.max_keys = max_keys;
self
}
pub fn prefix(mut self, prefix: Option<String>) -> Self {
self.prefix = prefix;
self
}
pub fn marker(mut self, marker: Option<String>) -> Self {
self.marker = marker;
self
}
}
/// Argument builder for ListObjectsV2 S3 API, created by
/// [list_objects_v2()](crate::s3::client::Client::list_objects_v2).
#[derive(Clone, Debug, Default)]
pub struct ListObjectsV2 {
client: Option<Client>,
extra_headers: Option<Multimap>,
extra_query_params: Option<Multimap>,
region: Option<String>,
bucket: String,
delimiter: Option<String>,
disable_url_encoding: bool,
max_keys: Option<u16>,
prefix: Option<String>,
start_after: Option<String>,
continuation_token: Option<String>,
fetch_owner: bool,
include_user_metadata: bool,
}
#[async_trait]
impl ToStream for ListObjectsV2 {
type Item = ListObjectsV2Response;
async fn to_stream(self) -> Box<dyn Stream<Item = Result<Self::Item, Error>> + Unpin + Send> {
Box::new(Box::pin(futures_stream::unfold(
(self.clone(), false),
move |(mut args, mut is_done)| async move {
if is_done {
return None;
}
let resp = args.send().await;
match resp {
Ok(resp) => {
args.continuation_token = resp.next_continuation_token.clone();
is_done = !resp.is_truncated;
Some((Ok(resp), (args, is_done)))
}
Err(e) => Some((Err(e), (args, true))),
}
},
)))
}
}
impl S3Api for ListObjectsV2 {
type S3Response = ListObjectsV2Response;
}
impl ToS3Request for ListObjectsV2 {
fn to_s3request(&self) -> Result<S3Request, Error> {
check_bucket_name(&self.bucket, true)?;
let mut headers = Multimap::new();
if let Some(v) = &self.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &self.extra_query_params {
merge(&mut query_params, v);
}
query_params.insert(String::from("list-type"), String::from("2"));
add_common_list_objects_query_params(
&mut query_params,
self.delimiter.as_deref(),
self.disable_url_encoding,
self.max_keys,
self.prefix.as_deref(),
);
if let Some(v) = &self.continuation_token {
query_params.insert(String::from("continuation-token"), v.to_string());
}
if self.fetch_owner {
query_params.insert(String::from("fetch-owner"), String::from("true"));
}
if let Some(v) = &self.start_after {
query_params.insert(String::from("start-after"), v.to_string());
}
if self.include_user_metadata {
query_params.insert(String::from("metadata"), String::from("true"));
}
let req = S3Request::new(
self.client.as_ref().ok_or(Error::NoClientProvided)?,
Method::GET,
)
.region(self.region.as_deref())
.bucket(Some(&self.bucket))
.query_params(query_params)
.headers(headers);
Ok(req)
}
}
impl From<ListObjects> for ListObjectsV2 {
fn from(value: ListObjects) -> Self {
ListObjectsV2 {
client: value.client,
extra_headers: value.extra_headers,
extra_query_params: value.extra_query_params,
region: value.region,
bucket: value.bucket,
delimiter: delim_helper(value.delimiter, value.recursive),
disable_url_encoding: value.disable_url_encoding,
max_keys: value.max_keys,
prefix: value.prefix,
start_after: value.start_after,
continuation_token: value.continuation_token,
fetch_owner: value.fetch_owner,
include_user_metadata: value.include_user_metadata,
}
}
}
impl ListObjectsV2 {
pub fn new(bucket: &str) -> Self {
Self {
bucket: bucket.to_owned(),
..Default::default()
}
}
pub fn client(mut self, client: &Client) -> Self {
self.client = Some(client.clone());
self
}
pub fn extra_headers(mut self, extra_headers: Option<Multimap>) -> Self {
self.extra_headers = extra_headers;
self
}
pub fn extra_query_params(mut self, extra_query_params: Option<Multimap>) -> Self {
self.extra_query_params = extra_query_params;
self
}
pub fn region(mut self, region: Option<String>) -> Self {
self.region = region;
self
}
pub fn delimiter(mut self, delimiter: Option<String>) -> Self {
self.delimiter = delimiter;
self
}
pub fn disable_url_encoding(mut self, disable_url_encoding: bool) -> Self {
self.disable_url_encoding = disable_url_encoding;
self
}
pub fn max_keys(mut self, max_keys: Option<u16>) -> Self {
self.max_keys = max_keys;
self
}
pub fn prefix(mut self, prefix: Option<String>) -> Self {
self.prefix = prefix;
self
}
pub fn start_after(mut self, start_after: Option<String>) -> Self {
self.start_after = start_after;
self
}
pub fn continuation_token(mut self, continuation_token: Option<String>) -> Self {
self.continuation_token = continuation_token;
self
}
pub fn fetch_owner(mut self, fetch_owner: bool) -> Self {
self.fetch_owner = fetch_owner;
self
}
pub fn include_user_metadata(mut self, include_user_metadata: bool) -> Self {
self.include_user_metadata = include_user_metadata;
self
}
}
/// Argument builder for ListObjectVerions S3 API created by
/// [list_object_versions()](crate::s3::client::Client::list_object_versions).
#[derive(Clone, Debug, Default)]
pub struct ListObjectVersions {
client: Option<Client>,
extra_headers: Option<Multimap>,
extra_query_params: Option<Multimap>,
region: Option<String>,
bucket: String,
delimiter: Option<String>,
disable_url_encoding: bool,
max_keys: Option<u16>,
prefix: Option<String>,
key_marker: Option<String>,
version_id_marker: Option<String>,
include_user_metadata: bool,
}
#[async_trait]
impl ToStream for ListObjectVersions {
type Item = ListObjectVersionsResponse;
async fn to_stream(self) -> Box<dyn Stream<Item = Result<Self::Item, Error>> + Unpin + Send> {
Box::new(Box::pin(futures_stream::unfold(
(self.clone(), false),
move |(mut args, mut is_done)| async move {
if is_done {
return None;
}
let resp = args.send().await;
match resp {
Ok(resp) => {
args.key_marker = resp.next_key_marker.clone();
args.version_id_marker = resp.next_version_id_marker.clone();
is_done = !resp.is_truncated;
Some((Ok(resp), (args, is_done)))
}
Err(e) => Some((Err(e), (args, true))),
}
},
)))
}
}
impl S3Api for ListObjectVersions {
type S3Response = ListObjectVersionsResponse;
}
impl ToS3Request for ListObjectVersions {
fn to_s3request(&self) -> Result<S3Request, Error> {
check_bucket_name(&self.bucket, true)?;
let mut headers = Multimap::new();
if let Some(v) = &self.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &self.extra_query_params {
merge(&mut query_params, v);
}
query_params.insert(String::from("versions"), String::new());
add_common_list_objects_query_params(
&mut query_params,
self.delimiter.as_deref(),
self.disable_url_encoding,
self.max_keys,
self.prefix.as_deref(),
);
if let Some(v) = &self.key_marker {
query_params.insert(String::from("key-marker"), v.to_string());
}
if let Some(v) = &self.version_id_marker {
query_params.insert(String::from("version-id-marker"), v.to_string());
}
if self.include_user_metadata {
query_params.insert(String::from("metadata"), String::from("true"));
}
let req = S3Request::new(
self.client.as_ref().ok_or(Error::NoClientProvided)?,
Method::GET,
)
.region(self.region.as_deref())
.bucket(Some(&self.bucket))
.query_params(query_params)
.headers(headers);
Ok(req)
}
}
impl From<ListObjects> for ListObjectVersions {
fn from(value: ListObjects) -> Self {
ListObjectVersions {
client: value.client,
extra_headers: value.extra_headers,
extra_query_params: value.extra_query_params,
region: value.region,
bucket: value.bucket,
delimiter: delim_helper(value.delimiter, value.recursive),
disable_url_encoding: value.disable_url_encoding,
max_keys: value.max_keys,
prefix: value.prefix,
key_marker: value.key_marker,
version_id_marker: value.version_id_marker,
include_user_metadata: value.include_user_metadata,
}
}
}
impl ListObjectVersions {
pub fn new(bucket: &str) -> Self {
Self {
bucket: bucket.to_owned(),
..Default::default()
}
}
pub fn client(mut self, client: &Client) -> Self {
self.client = Some(client.clone());
self
}
pub fn extra_headers(mut self, extra_headers: Option<Multimap>) -> Self {
self.extra_headers = extra_headers;
self
}
pub fn extra_query_params(mut self, extra_query_params: Option<Multimap>) -> Self {
self.extra_query_params = extra_query_params;
self
}
pub fn region(mut self, region: Option<String>) -> Self {
self.region = region;
self
}
pub fn delimiter(mut self, delimiter: Option<String>) -> Self {
self.delimiter = delimiter;
self
}
pub fn disable_url_encoding(mut self, disable_url_encoding: bool) -> Self {
self.disable_url_encoding = disable_url_encoding;
self
}
pub fn max_keys(mut self, max_keys: Option<u16>) -> Self {
self.max_keys = max_keys;
self
}
pub fn prefix(mut self, prefix: Option<String>) -> Self {
self.prefix = prefix;
self
}
pub fn key_marker(mut self, key_marker: Option<String>) -> Self {
self.key_marker = key_marker;
self
}
pub fn version_id_marker(mut self, version_id_marker: Option<String>) -> Self {
self.version_id_marker = version_id_marker;
self
}
pub fn include_user_metadata(mut self, include_user_metadata: bool) -> Self {
self.include_user_metadata = include_user_metadata;
self
}
}
/// Argument builder for
/// [list_objects()](crate::s3::client::Client::list_objects) API.
///
/// Use the various builder methods to set parameters on the request. Finally to
/// send the request and consume the results use the `ToStream` instance to get
/// a stream of results. Pagination is automatically performed.
#[derive(Clone, Debug, Default)]
pub struct ListObjects {
client: Option<Client>,
// Parameters common to all ListObjects APIs.
extra_headers: Option<Multimap>,
extra_query_params: Option<Multimap>,
region: Option<String>,
bucket: String,
delimiter: Option<String>,
disable_url_encoding: bool,
max_keys: Option<u16>,
prefix: Option<String>,
// Options specific to ListObjectsV1.
marker: Option<String>,
// Options specific to ListObjectsV2.
start_after: Option<String>,
continuation_token: Option<String>,
fetch_owner: bool,
include_user_metadata: bool,
// Options specific to ListObjectVersions.
key_marker: Option<String>,
version_id_marker: Option<String>,
// Higher level options.
recursive: bool,
use_api_v1: bool,
include_versions: bool,
}
#[async_trait]
impl ToStream for ListObjects {
type Item = ListObjectsResponse;
async fn to_stream(self) -> Box<dyn Stream<Item = Result<Self::Item, Error>> + Unpin + Send> {
if self.use_api_v1 {
let stream = ListObjectsV1::from(self).to_stream().await;
Box::new(stream.map(|v| v.map(|v| v.into())))
} else if self.include_versions {
let stream = ListObjectVersions::from(self).to_stream().await;
Box::new(stream.map(|v| v.map(|v| v.into())))
} else {
let stream = ListObjectsV2::from(self).to_stream().await;
Box::new(stream.map(|v| v.map(|v| v.into())))
}
}
}
impl ListObjects {
pub fn new(bucket: &str) -> Self {
Self {
bucket: bucket.to_owned(),
..Default::default()
}
}
pub fn client(mut self, client: &Client) -> Self {
self.client = Some(client.clone());
self
}
pub fn extra_headers(mut self, extra_headers: Option<Multimap>) -> Self {
self.extra_headers = extra_headers;
self
}
pub fn extra_query_params(mut self, extra_query_params: Option<Multimap>) -> Self {
self.extra_query_params = extra_query_params;
self
}
pub fn region(mut self, region: Option<String>) -> Self {
self.region = region;
self
}
/// Delimiter to roll up common prefixes on.
pub fn delimiter(mut self, delimiter: Option<String>) -> Self {
self.delimiter = delimiter;
self
}
/// Disable setting the `EncodingType` parameter in the ListObjects request.
/// By default it is set to `url`.
pub fn disable_url_encoding(mut self, disable_url_encoding: bool) -> Self {
self.disable_url_encoding = disable_url_encoding;
self
}
pub fn max_keys(mut self, max_keys: Option<u16>) -> Self {
self.max_keys = max_keys;
self
}
pub fn prefix(mut self, prefix: Option<String>) -> Self {
self.prefix = prefix;
self
}
/// Used only with ListObjectsV1.
pub fn marker(mut self, marker: Option<String>) -> Self {
self.marker = marker;
self
}
/// Used only with ListObjectsV2
pub fn start_after(mut self, start_after: Option<String>) -> Self {
self.start_after = start_after;
self
}
/// Used only with ListObjectsV2
pub fn continuation_token(mut self, continuation_token: Option<String>) -> Self {
self.continuation_token = continuation_token;
self
}
/// Used only with ListObjectsV2
pub fn fetch_owner(mut self, fetch_owner: bool) -> Self {
self.fetch_owner = fetch_owner;
self
}
/// Used only with ListObjectsV2. MinIO extension.
pub fn include_user_metadata(mut self, include_user_metadata: bool) -> Self {
self.include_user_metadata = include_user_metadata;
self
}
/// Used only with GetObjectVersions.
pub fn key_marker(mut self, key_marker: Option<String>) -> Self {
self.key_marker = key_marker;
self
}
/// Used only with GetObjectVersions.
pub fn version_id_marker(mut self, version_id_marker: Option<String>) -> Self {
self.version_id_marker = version_id_marker;
self
}
/// This parameter takes effect only when delimiter is None. Enables
/// recursive traversal for listing of the bucket and prefix.
pub fn recursive(mut self, recursive: bool) -> Self {
self.recursive = recursive;
self
}
/// Set this to use ListObjectsV1. Defaults to false.
pub fn use_api_v1(mut self, use_api_v1: bool) -> Self {
self.use_api_v1 = use_api_v1;
self
}
/// Set this to include versions.
pub fn include_versions(mut self, include_versions: bool) -> Self {
self.include_versions = include_versions;
self
}
}

View File

@ -386,7 +386,7 @@ impl Client {
pub async fn do_execute( pub async fn do_execute(
&self, &self,
method: Method, method: &Method,
region: &String, region: &String,
headers: &mut Multimap, headers: &mut Multimap,
query_params: &Multimap, query_params: &Multimap,
@ -398,7 +398,7 @@ impl Client {
let body = data.unwrap_or_default(); let body = data.unwrap_or_default();
let url = let url =
self.base_url self.base_url
.build_url(&method, region, query_params, bucket_name, object_name)?; .build_url(method, region, query_params, bucket_name, object_name)?;
self.build_headers(headers, query_params, region, &url, &method, body); self.build_headers(headers, query_params, region, &url, &method, body);
let mut req = self.client.request(method.clone(), url.to_string()); let mut req = self.client.request(method.clone(), url.to_string());
@ -409,7 +409,7 @@ impl Client {
} }
} }
if method == Method::PUT || method == Method::POST { if *method == Method::PUT || *method == Method::POST {
req = req.body(body.to_vec()); req = req.body(body.to_vec());
} }
@ -425,7 +425,7 @@ impl Client {
&mut body, &mut body,
status_code, status_code,
&header_map, &header_map,
&method, method,
&url.path, &url.path,
bucket_name, bucket_name,
object_name, object_name,
@ -458,7 +458,7 @@ impl Client {
) -> Result<reqwest::Response, Error> { ) -> Result<reqwest::Response, Error> {
let res = self let res = self
.do_execute( .do_execute(
method.clone(), &method,
region, region,
headers, headers,
query_params, query_params,
@ -482,7 +482,7 @@ impl Client {
// Retry only once on RetryHead error. // Retry only once on RetryHead error.
self.do_execute( self.do_execute(
method.clone(), &method,
region, region,
headers, headers,
query_params, query_params,

View File

@ -13,492 +13,22 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
//! S3 API: ListObjectsV2, ListObjectsV1, ListObjectVersions and streaming helper. //! S3 APIs for listing objects.
use std::collections::HashMap;
use super::Client; use super::Client;
use crate::s3::{ use crate::s3::builders::{ListObjectVersions, ListObjects, ListObjectsV1, ListObjectsV2};
args::{ListObjectVersionsArgs, ListObjectsArgs, ListObjectsV1Args, ListObjectsV2Args},
error::Error,
response::{ListObjectVersionsResponse, ListObjectsV1Response, ListObjectsV2Response},
types::ListEntry,
utils::{
from_iso8601utc, get_default_text, get_option_text, get_text, merge, urldecode, Multimap,
},
};
use bytes::Buf;
use futures_util::{stream as futures_stream, Stream, StreamExt};
use http::Method;
use xmltree::Element;
fn url_decode(
encoding_type: &Option<String>,
prefix: Option<String>,
) -> Result<Option<String>, Error> {
if let Some(v) = encoding_type.as_ref() {
if v == "url" {
if let Some(v) = prefix {
return Ok(Some(urldecode(&v)?.to_string()));
}
}
}
if let Some(v) = prefix.as_ref() {
return Ok(Some(v.to_string()));
}
Ok(None)
}
fn add_common_list_objects_query_params(
query_params: &mut Multimap,
delimiter: Option<&str>,
use_url_encoding_type: bool,
max_keys: Option<u16>,
prefix: Option<&str>,
) {
query_params.insert(
String::from("delimiter"),
delimiter.unwrap_or("").to_string(),
);
query_params.insert(
String::from("max-keys"),
max_keys.unwrap_or(1000).to_string(),
);
query_params.insert(String::from("prefix"), prefix.unwrap_or("").to_string());
if use_url_encoding_type {
query_params.insert(String::from("encoding-type"), String::from("url"));
}
}
fn parse_common_list_objects_response(
root: &Element,
) -> Result<
(
String,
Option<String>,
Option<String>,
Option<String>,
bool,
Option<u16>,
),
Error,
> {
let encoding_type = get_option_text(root, "EncodingType");
let prefix = url_decode(&encoding_type, Some(get_default_text(root, "Prefix")))?;
Ok((
get_text(root, "Name")?,
encoding_type,
prefix,
get_option_text(root, "Delimiter"),
match get_option_text(root, "IsTruncated") {
Some(v) => v.to_lowercase() == "true",
None => false,
},
match get_option_text(root, "MaxKeys") {
Some(v) => Some(v.parse::<u16>()?),
None => None,
},
))
}
fn parse_list_objects_contents(
contents: &mut Vec<ListEntry>,
root: &mut xmltree::Element,
tag: &str,
encoding_type: &Option<String>,
is_delete_marker: bool,
) -> Result<(), Error> {
while let Some(v) = root.take_child(tag) {
let content = v;
let etype = encoding_type.as_ref().cloned();
let key = url_decode(&etype, Some(get_text(&content, "Key")?))?.unwrap();
let last_modified = Some(from_iso8601utc(&get_text(&content, "LastModified")?)?);
let etag = get_option_text(&content, "ETag");
let v = get_default_text(&content, "Size");
let size = match v.is_empty() {
true => None,
false => Some(v.parse::<usize>()?),
};
let storage_class = get_option_text(&content, "StorageClass");
let is_latest = get_default_text(&content, "IsLatest").to_lowercase() == "true";
let version_id = get_option_text(&content, "VersionId");
let (owner_id, owner_name) = match content.get_child("Owner") {
Some(v) => (get_option_text(v, "ID"), get_option_text(v, "DisplayName")),
None => (None, None),
};
let user_metadata = match content.get_child("UserMetadata") {
Some(v) => {
let mut map: HashMap<String, String> = HashMap::new();
for xml_node in &v.children {
let u = xml_node
.as_element()
.ok_or(Error::XmlError("unable to convert to element".to_string()))?;
map.insert(
u.name.to_string(),
u.get_text().unwrap_or_default().to_string(),
);
}
Some(map)
}
None => None,
};
contents.push(ListEntry {
name: key,
last_modified,
etag,
owner_id,
owner_name,
size,
storage_class,
is_latest,
version_id,
user_metadata,
is_prefix: false,
is_delete_marker,
encoding_type: etype,
});
}
Ok(())
}
fn parse_list_objects_common_prefixes(
contents: &mut Vec<ListEntry>,
root: &mut Element,
encoding_type: &Option<String>,
) -> Result<(), Error> {
while let Some(v) = root.take_child("CommonPrefixes") {
let common_prefix = v;
contents.push(ListEntry {
name: url_decode(encoding_type, Some(get_text(&common_prefix, "Prefix")?))?.unwrap(),
last_modified: None,
etag: None,
owner_id: None,
owner_name: None,
size: None,
storage_class: None,
is_latest: false,
version_id: None,
user_metadata: None,
is_prefix: true,
is_delete_marker: false,
encoding_type: encoding_type.as_ref().cloned(),
});
}
Ok(())
}
impl Client { impl Client {
/// Executes [ListObjects](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html) S3 API pub fn list_objects_v1(&self, bucket: &str) -> ListObjectsV1 {
pub async fn list_objects_v1( ListObjectsV1::new(bucket).client(self)
&self,
args: &ListObjectsV1Args,
) -> Result<ListObjectsV1Response, Error> {
let region = self
.get_region(&args.bucket, args.region.as_deref())
.await?;
let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
add_common_list_objects_query_params(
&mut query_params,
args.delimiter.as_deref(),
args.use_url_encoding_type,
args.max_keys,
args.prefix.as_deref(),
);
if let Some(v) = &args.marker {
query_params.insert(String::from("marker"), v.to_string());
}
let resp = self
.execute(
Method::GET,
&region,
&mut headers,
&query_params,
Some(&args.bucket),
None,
None,
)
.await?;
let header_map = resp.headers().clone();
let body = resp.bytes().await?;
let mut root = Element::parse(body.reader())?;
let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) =
parse_common_list_objects_response(&root)?;
let marker = url_decode(&encoding_type, get_option_text(&root, "Marker"))?;
let mut next_marker = url_decode(&encoding_type, get_option_text(&root, "NextMarker"))?;
let mut contents: Vec<ListEntry> = Vec::new();
parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?;
if is_truncated && next_marker.is_none() {
next_marker = contents.last().map(|v| v.name.clone())
}
parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?;
Ok(ListObjectsV1Response {
headers: header_map,
name,
encoding_type,
prefix,
delimiter,
is_truncated,
max_keys,
contents,
marker,
next_marker,
})
} }
/// Executes [ListObjectsV2](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html) S3 API pub fn list_objects_v2(&self, bucket: &str) -> ListObjectsV2 {
pub async fn list_objects_v2( ListObjectsV2::new(bucket).client(self)
&self,
args: &ListObjectsV2Args,
) -> Result<ListObjectsV2Response, Error> {
let region = self
.get_region(&args.bucket, args.region.as_deref())
.await?;
let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
query_params.insert(String::from("list-type"), String::from("2"));
add_common_list_objects_query_params(
&mut query_params,
args.delimiter.as_deref(),
args.use_url_encoding_type,
args.max_keys,
args.prefix.as_deref(),
);
if let Some(v) = &args.continuation_token {
query_params.insert(String::from("continuation-token"), v.to_string());
}
if args.fetch_owner {
query_params.insert(String::from("fetch-owner"), String::from("true"));
}
if let Some(v) = &args.start_after {
query_params.insert(String::from("start-after"), v.to_string());
}
if args.include_user_metadata {
query_params.insert(String::from("metadata"), String::from("true"));
}
let resp = self
.execute(
Method::GET,
&region,
&mut headers,
&query_params,
Some(&args.bucket),
None,
None,
)
.await?;
let header_map = resp.headers().clone();
let body = resp.bytes().await?;
let mut root = Element::parse(body.reader())?;
let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) =
parse_common_list_objects_response(&root)?;
let text = get_option_text(&root, "KeyCount");
let key_count = match text {
Some(v) => match v.is_empty() {
true => None,
false => Some(v.parse::<u16>()?),
},
None => None,
};
let start_after = url_decode(&encoding_type, get_option_text(&root, "StartAfter"))?;
let continuation_token = get_option_text(&root, "ContinuationToken");
let next_continuation_token = get_option_text(&root, "NextContinuationToken");
let mut contents: Vec<ListEntry> = Vec::new();
parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?;
parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?;
Ok(ListObjectsV2Response {
headers: header_map,
name,
encoding_type,
prefix,
delimiter,
is_truncated,
max_keys,
contents,
key_count,
start_after,
continuation_token,
next_continuation_token,
})
} }
/// Executes [ListObjectVersions](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html) S3 API pub fn list_object_versions(&self, bucket: &str) -> ListObjectVersions {
pub async fn list_object_versions( ListObjectVersions::new(bucket).client(self)
&self,
args: &ListObjectVersionsArgs,
) -> Result<ListObjectVersionsResponse, Error> {
let region = self
.get_region(&args.bucket, args.region.as_deref())
.await?;
let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
query_params.insert(String::from("versions"), String::new());
add_common_list_objects_query_params(
&mut query_params,
args.delimiter.as_deref(),
args.use_url_encoding_type,
args.max_keys,
args.prefix.as_deref(),
);
if let Some(v) = &args.key_marker {
query_params.insert(String::from("key-marker"), v.to_string());
}
if let Some(v) = &args.version_id_marker {
query_params.insert(String::from("version-id-marker"), v.to_string());
}
let resp = self
.execute(
Method::GET,
&region,
&mut headers,
&query_params,
Some(&args.bucket),
None,
None,
)
.await?;
let header_map = resp.headers().clone();
let body = resp.bytes().await?;
let mut root = Element::parse(body.reader())?;
let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) =
parse_common_list_objects_response(&root)?;
let key_marker = url_decode(&encoding_type, get_option_text(&root, "KeyMarker"))?;
let next_key_marker = url_decode(&encoding_type, get_option_text(&root, "NextKeyMarker"))?;
let version_id_marker = get_option_text(&root, "VersionIdMarker");
let next_version_id_marker = get_option_text(&root, "NextVersionIdMarker");
let mut contents: Vec<ListEntry> = Vec::new();
parse_list_objects_contents(&mut contents, &mut root, "Version", &encoding_type, false)?;
parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?;
parse_list_objects_contents(
&mut contents,
&mut root,
"DeleteMarker",
&encoding_type,
true,
)?;
Ok(ListObjectVersionsResponse {
headers: header_map,
name,
encoding_type,
prefix,
delimiter,
is_truncated,
max_keys,
contents,
key_marker,
next_key_marker,
version_id_marker,
next_version_id_marker,
})
}
async fn list_objects_v1_stream(
&self,
args: ListObjectsV1Args,
) -> impl Stream<Item = Result<ListObjectsV1Response, Error>> + Unpin {
Box::pin(futures_stream::unfold(
(self.clone(), args),
move |(client, mut args)| async move {
let resp = client.list_objects_v1(&args).await;
match resp {
Ok(resp) => {
if !resp.is_truncated {
None
} else {
args.marker = resp.next_marker.clone();
Some((Ok(resp), (client, args)))
}
}
Err(e) => Some((Err(e), (client, args))),
}
},
))
}
async fn list_objects_v2_stream(
&self,
args: ListObjectsV2Args,
) -> impl Stream<Item = Result<ListObjectsV2Response, Error>> + Unpin {
Box::pin(futures_stream::unfold(
(self.clone(), args),
move |(client, mut args)| async move {
let resp = client.list_objects_v2(&args).await;
match resp {
Ok(resp) => {
if !resp.is_truncated {
None
} else {
args.continuation_token = resp.next_continuation_token.clone();
Some((Ok(resp), (client, args)))
}
}
Err(e) => Some((Err(e), (client, args))),
}
},
))
}
async fn list_object_versions_stream(
&self,
args: ListObjectVersionsArgs,
) -> impl Stream<Item = Result<ListObjectVersionsResponse, Error>> + Unpin {
Box::pin(futures_stream::unfold(
(self.clone(), args),
move |(client, mut args)| async move {
let resp = client.list_object_versions(&args).await;
match resp {
Ok(resp) => {
if !resp.is_truncated {
None
} else {
args.key_marker = resp.next_key_marker.clone();
args.version_id_marker = resp.next_version_id_marker.clone();
Some((Ok(resp), (client, args)))
}
}
Err(e) => Some((Err(e), (client, args))),
}
},
))
} }
/// List objects with version information optionally. This function handles /// List objects with version information optionally. This function handles
@ -511,7 +41,7 @@ impl Client {
/// use minio::s3::client::{Client, ClientBuilder}; /// use minio::s3::client::{Client, ClientBuilder};
/// use minio::s3::creds::StaticProvider; /// use minio::s3::creds::StaticProvider;
/// use minio::s3::http::BaseUrl; /// use minio::s3::http::BaseUrl;
/// use minio::s3::args::ListObjectsArgs; /// use minio::s3::types::ToStream;
/// use futures_util::StreamExt; /// use futures_util::StreamExt;
/// ///
/// #[tokio::main] /// #[tokio::main]
@ -529,13 +59,15 @@ impl Client {
/// .unwrap(); /// .unwrap();
/// ///
/// // List all objects in a directory. /// // List all objects in a directory.
/// let mut list_objects_arg = ListObjectsArgs::new("my-bucket").unwrap(); /// let mut list_objects = client
/// list_objects_arg.recursive = true; /// .list_objects("my-bucket")
/// let mut stream = client.list_objects(list_objects_arg).await; /// .recursive(true)
/// while let Some(result) = stream.next().await { /// .to_stream()
/// .await;
/// while let Some(result) = list_objects.next().await {
/// match result { /// match result {
/// Ok(items) => { /// Ok(resp) => {
/// for item in items { /// for item in resp.contents {
/// println!("{:?}", item); /// println!("{:?}", item);
/// } /// }
/// } /// }
@ -543,19 +75,7 @@ impl Client {
/// } /// }
/// } /// }
/// } /// }
pub async fn list_objects( pub fn list_objects(&self, bucket: &str) -> ListObjects {
&self, ListObjects::new(bucket).client(self)
args: ListObjectsArgs,
) -> Box<dyn Stream<Item = Result<Vec<ListEntry>, Error>> + Unpin> {
if args.include_versions {
let stream = self.list_object_versions_stream(args.into()).await;
Box::new(stream.map(|v| v.map(|v| v.contents)))
} else if args.use_api_v1 {
let stream = self.list_objects_v1_stream(args.into()).await;
Box::new(stream.map(|v| v.map(|v| v.contents)))
} else {
let stream = self.list_objects_v2_stream(args.into()).await;
Box::new(stream.map(|v| v.map(|v| v.contents)))
}
} }
} }

View File

@ -109,6 +109,7 @@ pub enum Error {
InvalidFilter, InvalidFilter,
PostPolicyError(String), PostPolicyError(String),
InvalidObjectLockConfig(String), InvalidObjectLockConfig(String),
NoClientProvided,
} }
impl std::error::Error for Error {} impl std::error::Error for Error {}
@ -116,7 +117,7 @@ impl std::error::Error for Error {}
impl fmt::Display for Error { impl fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {
Error::TimeParseError(e) => write!(f, "{}", e), Error::TimeParseError(e) => write!(f, "{}", e),
Error::InvalidUrl(e) => write!(f, "{}", e), Error::InvalidUrl(e) => write!(f, "{}", e),
Error::IOError(e) => write!(f, "{}", e), Error::IOError(e) => write!(f, "{}", e),
Error::XmlParseError(e) => write!(f, "{}", e), Error::XmlParseError(e) => write!(f, "{}", e),
@ -124,53 +125,96 @@ impl fmt::Display for Error {
Error::StrError(e) => write!(f, "{}", e), Error::StrError(e) => write!(f, "{}", e),
Error::IntError(e) => write!(f, "{}", e), Error::IntError(e) => write!(f, "{}", e),
Error::BoolError(e) => write!(f, "{}", e), Error::BoolError(e) => write!(f, "{}", e),
Error::Utf8Error(e) => write!(f, "{}", e), Error::Utf8Error(e) => write!(f, "{}", e),
Error::JsonError(e) => write!(f, "{}", e), Error::JsonError(e) => write!(f, "{}", e),
Error::XmlError(m) => write!(f, "{}", m), Error::XmlError(m) => write!(f, "{}", m),
Error::InvalidBucketName(m) => write!(f, "{}", m), Error::InvalidBucketName(m) => write!(f, "{}", m),
Error::InvalidObjectName(m) => write!(f, "{}", m), Error::InvalidObjectName(m) => write!(f, "{}", m),
Error::InvalidUploadId(m) => write!(f, "{}", m), Error::InvalidUploadId(m) => write!(f, "{}", m),
Error::InvalidPartNumber(m) => write!(f, "{}", m), Error::InvalidPartNumber(m) => write!(f, "{}", m),
Error::EmptyParts(m) => write!(f, "{}", m), Error::EmptyParts(m) => write!(f, "{}", m),
Error::InvalidRetentionMode(m) => write!(f, "invalid retention mode {}", m), Error::InvalidRetentionMode(m) => write!(f, "invalid retention mode {}", m),
Error::InvalidRetentionConfig(m) => write!(f, "invalid retention configuration; {}", m), Error::InvalidRetentionConfig(m) => write!(f, "invalid retention configuration; {}", m),
Error::InvalidMinPartSize(s) => write!(f, "part size {} is not supported; minimum allowed 5MiB", s), Error::InvalidMinPartSize(s) => {
Error::InvalidMaxPartSize(s) => write!(f, "part size {} is not supported; maximum allowed 5GiB", s), write!(f, "part size {} is not supported; minimum allowed 5MiB", s)
Error::InvalidObjectSize(s) => write!(f, "object size {} is not supported; maximum allowed 5TiB", s), }
Error::MissingPartSize => write!(f, "valid part size must be provided when object size is unknown"), Error::InvalidMaxPartSize(s) => {
Error::InvalidPartCount(os, ps, pc) => write!(f, "object size {} and part size {} make more than {} parts for upload", os, ps, pc), write!(f, "part size {} is not supported; maximum allowed 5GiB", s)
Error::SseTlsRequired(m) => write!(f, "{}SSE operation must be performed over a secure connection", m.as_ref().map_or(String::new(), |v| v.clone())), }
Error::InsufficientData(ps, br) => write!(f, "not enough data in the stream; expected: {}, got: {} bytes", ps, br), Error::InvalidObjectSize(s) => write!(
Error::InvalidBaseUrl(m) => write!(f, "{}", m), f,
Error::UrlBuildError(m) => write!(f, "{}", m), "object size {} is not supported; maximum allowed 5TiB",
Error::InvalidLegalHold(s) => write!(f, "invalid legal hold {}", s), s
Error::RegionMismatch(br, r) => write!(f, "region must be {}, but passed {}", br, r), ),
Error::S3Error(er) => write!(f, "s3 operation failed; code: {}, message: {}, resource: {}, request_id: {}, host_id: {}, bucket_name: {}, object_name: {}", er.code, er.message, er.resource, er.request_id, er.host_id, er.bucket_name, er.object_name), Error::MissingPartSize => write!(
Error::InvalidResponse(sc, ct) => write!(f, "invalid response received; status code: {}; content-type: {}", sc, ct), f,
Error::ServerError(sc) => write!(f, "server failed with HTTP status code {}", sc), "valid part size must be provided when object size is unknown"
Error::InvalidSelectExpression(m) => write!(f, "{}", m), ),
Error::InvalidHeaderValueType(v) => write!(f, "invalid header value type {}", v), Error::InvalidPartCount(os, ps, pc) => write!(
Error::CrcMismatch(t, e, g) => write!(f, "{} CRC mismatch; expected: {}, got: {}", t, e, g), f,
Error::UnknownEventType(et) => write!(f, "unknown event type {}", et), "object size {} and part size {} make more than {} parts for upload",
Error::SelectError(ec, em) => write!(f, "error code: {}, error message: {}", ec, em), os, ps, pc
Error::UnsupportedApi(a) => write!(f, "{} API is not supported in Amazon AWS S3", a), ),
Error::InvalidComposeSource(m) => write!(f, "{}", m), Error::SseTlsRequired(m) => write!(
Error::InvalidComposeSourceOffset(b, o, v, of, os) => write!(f, "source {}/{}{}: offset {} is beyond object size {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), of, os), f,
Error::InvalidComposeSourceLength(b, o, v, l, os) => write!(f, "source {}/{}{}: length {} is beyond object size {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), l, os), "{}SSE operation must be performed over a secure connection",
Error::InvalidComposeSourceSize(b, o, v, cs, os) => write!(f, "source {}/{}{}: compose size {} is beyond object size {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), cs, os), m.as_ref().map_or(String::new(), |v| v.clone())
Error::InvalidDirective(m) => write!(f, "{}", m), ),
Error::InvalidCopyDirective(m) => write!(f, "{}", m), Error::InsufficientData(ps, br) => write!(
Error::InvalidComposeSourcePartSize(b, o, v, s, es) => write!(f, "source {}/{}{}: size {} must be greater than {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), s, es), f,
Error::InvalidComposeSourceMultipart(b, o, v, s, es) => write!(f, "source {}/{}{}: size {} for multipart split upload of {}, last part size is less than {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), s, s, es), "not enough data in the stream; expected: {}, got: {} bytes",
Error::InvalidMultipartCount(c) => write!(f, "Compose sources create more than allowed multipart count {}", c), ps, br
Error::MissingLifecycleAction => write!(f, "at least one of action (AbortIncompleteMultipartUpload, Expiration, NoncurrentVersionExpiration, NoncurrentVersionTransition or Transition) must be specified in a rule"), ),
Error::InvalidExpiredObjectDeleteMarker => write!(f, "ExpiredObjectDeleteMarker must not be provided along with Date and Days"), Error::InvalidBaseUrl(m) => write!(f, "{}", m),
Error::InvalidDateAndDays(m) => write!(f, "Only one of date or days of {} must be set", m), Error::UrlBuildError(m) => write!(f, "{}", m),
Error::InvalidLifecycleRuleId => write!(f, "id must be exceed 255 characters"), Error::InvalidLegalHold(s) => write!(f, "invalid legal hold {}", s),
Error::InvalidFilter => write!(f, "only one of And, Prefix or Tag must be provided"), Error::RegionMismatch(br, r) => write!(f, "region must be {}, but passed {}", br, r),
Error::PostPolicyError(m) => write!(f, "{}", m), Error::S3Error(er) => write!(
Error::InvalidObjectLockConfig(m) => write!(f, "{}", m), f,
} "s3 operation failed; code: {}, message: {}, resource: {}, request_id: {}, host_id: {}, bucket_name: {}, object_name: {}",
er.code, er.message, er.resource, er.request_id, er.host_id, er.bucket_name, er.object_name,
),
Error::InvalidResponse(sc, ct) => write!(
f,
"invalid response received; status code: {}; content-type: {}",
sc, ct
),
Error::ServerError(sc) => write!(f, "server failed with HTTP status code {}", sc),
Error::InvalidSelectExpression(m) => write!(f, "{}", m),
Error::InvalidHeaderValueType(v) => write!(f, "invalid header value type {}", v),
Error::CrcMismatch(t, e, g) => {
write!(f, "{} CRC mismatch; expected: {}, got: {}", t, e, g)
}
Error::UnknownEventType(et) => write!(f, "unknown event type {}", et),
Error::SelectError(ec, em) => write!(f, "error code: {}, error message: {}", ec, em),
Error::UnsupportedApi(a) => write!(f, "{} API is not supported in Amazon AWS S3", a),
Error::InvalidComposeSource(m) => write!(f, "{}", m),
Error::InvalidComposeSourceOffset(b, o, v, of, os) => write!(f, "source {}/{}{}: offset {} is beyond object size {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), of, os),
Error::InvalidComposeSourceLength(b, o, v, l, os) => write!(f, "source {}/{}{}: length {} is beyond object size {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), l, os),
Error::InvalidComposeSourceSize(b, o, v, cs, os) => write!(f, "source {}/{}{}: compose size {} is beyond object size {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), cs, os),
Error::InvalidDirective(m) => write!(f, "{}", m),
Error::InvalidCopyDirective(m) => write!(f, "{}", m),
Error::InvalidComposeSourcePartSize(b, o, v, s, es) => write!(f, "source {}/{}{}: size {} must be greater than {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), s, es),
Error::InvalidComposeSourceMultipart(b, o, v, s, es) => write!(f, "source {}/{}{}: size {} for multipart split upload of {}, last part size is less than {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), s, s, es),
Error::InvalidMultipartCount(c) => write!(
f,
"Compose sources create more than allowed multipart count {}",
c
),
Error::MissingLifecycleAction => write!(f, "at least one of action (AbortIncompleteMultipartUpload, Expiration, NoncurrentVersionExpiration, NoncurrentVersionTransition or Transition) must be specified in a rule"),
Error::InvalidExpiredObjectDeleteMarker => write!(
f,
"ExpiredObjectDeleteMarker must not be provided along with Date and Days"
),
Error::InvalidDateAndDays(m) => {
write!(f, "Only one of date or days of {} must be set", m)
}
Error::InvalidLifecycleRuleId => write!(f, "id must be exceed 255 characters"),
Error::InvalidFilter => write!(f, "only one of And, Prefix or Tag must be provided"),
Error::PostPolicyError(m) => write!(f, "{}", m),
Error::InvalidObjectLockConfig(m) => write!(f, "{}", m),
Error::NoClientProvided => write!(f, "no client provided"),
}
} }
} }

View File

@ -16,6 +16,7 @@
//! Implementation of Simple Storage Service (aka S3) client //! Implementation of Simple Storage Service (aka S3) client
pub mod args; pub mod args;
pub mod builders;
pub mod client; pub mod client;
pub mod creds; pub mod creds;
pub mod error; pub mod error;

View File

@ -15,19 +15,27 @@
//! Responses for [minio::s3::client::Client](crate::s3::client::Client) APIs //! Responses for [minio::s3::client::Client](crate::s3::client::Client) APIs
use std::collections::HashMap;
use std::collections::VecDeque;
use reqwest::header::HeaderMap;
use std::io::BufReader;
use xmltree::Element;
use crate::s3::error::Error; use crate::s3::error::Error;
use crate::s3::types::{ use crate::s3::types::{
parse_legal_hold, Bucket, LifecycleConfig, ListEntry, NotificationConfig, ObjectLockConfig, parse_legal_hold, Bucket, LifecycleConfig, NotificationConfig, ObjectLockConfig,
ReplicationConfig, RetentionMode, SelectProgress, SseConfig, ReplicationConfig, RetentionMode, SelectProgress, SseConfig,
}; };
use crate::s3::utils::{ use crate::s3::utils::{
copy_slice, crc32, from_http_header_value, from_iso8601utc, get_text, uint32, UtcTime, copy_slice, crc32, from_http_header_value, from_iso8601utc, get_text, uint32, UtcTime,
}; };
use reqwest::header::HeaderMap;
use std::collections::HashMap; mod list_objects;
use std::collections::VecDeque;
use std::io::BufReader; pub use list_objects::{
use xmltree::Element; ListObjectVersionsResponse, ListObjectsResponse, ListObjectsV1Response, ListObjectsV2Response,
};
#[derive(Debug)] #[derive(Debug)]
/// Response of [list_buckets()](crate::s3::client::Client::list_buckets) API /// Response of [list_buckets()](crate::s3::client::Client::list_buckets) API
@ -235,84 +243,6 @@ pub struct RemoveObjectsApiResponse {
/// Response of [remove_objects()](crate::s3::client::Client::remove_objects) API /// Response of [remove_objects()](crate::s3::client::Client::remove_objects) API
pub type RemoveObjectsResponse = RemoveObjectsApiResponse; pub type RemoveObjectsResponse = RemoveObjectsApiResponse;
#[derive(Clone, Debug)]
/// Response of [list_objects_v1()](crate::s3::client::Client::list_objects_v1) S3 API
pub struct ListObjectsV1Response {
pub headers: HeaderMap,
pub name: String,
pub encoding_type: Option<String>,
pub prefix: Option<String>,
pub delimiter: Option<String>,
pub is_truncated: bool,
pub max_keys: Option<u16>,
pub contents: Vec<ListEntry>,
pub marker: Option<String>,
pub next_marker: Option<String>,
}
#[derive(Clone, Debug)]
/// Response of [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API
pub struct ListObjectsV2Response {
pub headers: HeaderMap,
pub name: String,
pub encoding_type: Option<String>,
pub prefix: Option<String>,
pub delimiter: Option<String>,
pub is_truncated: bool,
pub max_keys: Option<u16>,
pub contents: Vec<ListEntry>,
pub key_count: Option<u16>,
pub start_after: Option<String>,
pub continuation_token: Option<String>,
pub next_continuation_token: Option<String>,
}
#[derive(Clone, Debug)]
/// Response of [list_object_versions()](crate::s3::client::Client::list_object_versions) S3 API
pub struct ListObjectVersionsResponse {
pub headers: HeaderMap,
pub name: String,
pub encoding_type: Option<String>,
pub prefix: Option<String>,
pub delimiter: Option<String>,
pub is_truncated: bool,
pub max_keys: Option<u16>,
pub contents: Vec<ListEntry>,
pub key_marker: Option<String>,
pub next_key_marker: Option<String>,
pub version_id_marker: Option<String>,
pub next_version_id_marker: Option<String>,
}
#[derive(Clone, Debug)]
/// Response of [list_objects()](crate::s3::client::Client::list_objects) API
pub struct ListObjectsResponse {
pub headers: HeaderMap,
pub name: String,
pub encoding_type: Option<String>,
pub prefix: Option<String>,
pub delimiter: Option<String>,
pub is_truncated: bool,
pub max_keys: Option<u16>,
pub contents: Vec<ListEntry>,
// ListObjectsV1
pub marker: String,
pub next_marker: String,
// ListObjectsV2
pub key_count: u16,
pub start_after: String,
pub continuation_token: String,
pub next_continuation_token: String,
// ListObjectVersions
pub key_marker: String,
pub next_key_marker: String,
pub version_id_marker: String,
pub next_version_id_marker: String,
}
/// Response of [select_object_content()](crate::s3::client::Client::select_object_content) API /// Response of [select_object_content()](crate::s3::client::Client::select_object_content) API
pub struct SelectObjectContentResponse { pub struct SelectObjectContentResponse {
pub headers: HeaderMap, pub headers: HeaderMap,

View File

@ -0,0 +1,425 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Response types for ListObjects APIs
use std::collections::HashMap;
use async_trait::async_trait;
use bytes::Buf;
use reqwest::header::HeaderMap;
use xmltree::Element;
use crate::s3::{
error::Error,
types::{FromS3Response, ListEntry, S3Request},
utils::{from_iso8601utc, get_default_text, get_option_text, get_text, urldecode},
};
fn url_decode(
encoding_type: &Option<String>,
prefix: Option<String>,
) -> Result<Option<String>, Error> {
if let Some(v) = encoding_type.as_ref() {
if v == "url" {
if let Some(v) = prefix {
return Ok(Some(urldecode(&v)?.to_string()));
}
}
}
if let Some(v) = prefix.as_ref() {
return Ok(Some(v.to_string()));
}
Ok(None)
}
fn parse_common_list_objects_response(
root: &Element,
) -> Result<
(
String,
Option<String>,
Option<String>,
Option<String>,
bool,
Option<u16>,
),
Error,
> {
let encoding_type = get_option_text(root, "EncodingType");
let prefix = url_decode(&encoding_type, Some(get_default_text(root, "Prefix")))?;
Ok((
get_text(root, "Name")?,
encoding_type,
prefix,
get_option_text(root, "Delimiter"),
match get_option_text(root, "IsTruncated") {
Some(v) => v.to_lowercase() == "true",
None => false,
},
match get_option_text(root, "MaxKeys") {
Some(v) => Some(v.parse::<u16>()?),
None => None,
},
))
}
fn parse_list_objects_contents(
contents: &mut Vec<ListEntry>,
root: &mut xmltree::Element,
tag: &str,
encoding_type: &Option<String>,
is_delete_marker: bool,
) -> Result<(), Error> {
while let Some(v) = root.take_child(tag) {
let content = v;
let etype = encoding_type.as_ref().cloned();
let key = url_decode(&etype, Some(get_text(&content, "Key")?))?.unwrap();
let last_modified = Some(from_iso8601utc(&get_text(&content, "LastModified")?)?);
let etag = get_option_text(&content, "ETag");
let v = get_default_text(&content, "Size");
let size = match v.is_empty() {
true => None,
false => Some(v.parse::<usize>()?),
};
let storage_class = get_option_text(&content, "StorageClass");
let is_latest = get_default_text(&content, "IsLatest").to_lowercase() == "true";
let version_id = get_option_text(&content, "VersionId");
let (owner_id, owner_name) = match content.get_child("Owner") {
Some(v) => (get_option_text(v, "ID"), get_option_text(v, "DisplayName")),
None => (None, None),
};
let user_metadata = match content.get_child("UserMetadata") {
Some(v) => {
let mut map: HashMap<String, String> = HashMap::new();
for xml_node in &v.children {
let u = xml_node
.as_element()
.ok_or(Error::XmlError("unable to convert to element".to_string()))?;
map.insert(
u.name.to_string(),
u.get_text().unwrap_or_default().to_string(),
);
}
Some(map)
}
None => None,
};
contents.push(ListEntry {
name: key,
last_modified,
etag,
owner_id,
owner_name,
size,
storage_class,
is_latest,
version_id,
user_metadata,
is_prefix: false,
is_delete_marker,
encoding_type: etype,
});
}
Ok(())
}
fn parse_list_objects_common_prefixes(
contents: &mut Vec<ListEntry>,
root: &mut Element,
encoding_type: &Option<String>,
) -> Result<(), Error> {
while let Some(v) = root.take_child("CommonPrefixes") {
let common_prefix = v;
contents.push(ListEntry {
name: url_decode(encoding_type, Some(get_text(&common_prefix, "Prefix")?))?.unwrap(),
last_modified: None,
etag: None,
owner_id: None,
owner_name: None,
size: None,
storage_class: None,
is_latest: false,
version_id: None,
user_metadata: None,
is_prefix: true,
is_delete_marker: false,
encoding_type: encoding_type.as_ref().cloned(),
});
}
Ok(())
}
/// Response of [list_objects_v1()](crate::s3::client::Client::list_objects_v1) S3 API
#[derive(Clone, Debug)]
pub struct ListObjectsV1Response {
pub headers: HeaderMap,
pub name: String,
pub encoding_type: Option<String>,
pub prefix: Option<String>,
pub delimiter: Option<String>,
pub is_truncated: bool,
pub max_keys: Option<u16>,
pub contents: Vec<ListEntry>,
pub marker: Option<String>,
pub next_marker: Option<String>,
}
#[async_trait]
impl FromS3Response for ListObjectsV1Response {
async fn from_s3response<'a>(
_req: S3Request<'a>,
resp: reqwest::Response,
) -> Result<Self, Error> {
let headers = resp.headers().clone();
let body = resp.bytes().await?;
let mut root = Element::parse(body.reader())?;
let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) =
parse_common_list_objects_response(&root)?;
let marker = url_decode(&encoding_type, get_option_text(&root, "Marker"))?;
let mut next_marker = url_decode(&encoding_type, get_option_text(&root, "NextMarker"))?;
let mut contents: Vec<ListEntry> = Vec::new();
parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?;
if is_truncated && next_marker.is_none() {
next_marker = contents.last().map(|v| v.name.clone())
}
parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?;
Ok(ListObjectsV1Response {
headers,
name,
encoding_type,
prefix,
delimiter,
is_truncated,
max_keys,
contents,
marker,
next_marker,
})
}
}
/// Response of [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API
#[derive(Clone, Debug)]
pub struct ListObjectsV2Response {
pub headers: HeaderMap,
pub name: String,
pub encoding_type: Option<String>,
pub prefix: Option<String>,
pub delimiter: Option<String>,
pub is_truncated: bool,
pub max_keys: Option<u16>,
pub contents: Vec<ListEntry>,
pub key_count: Option<u16>,
pub start_after: Option<String>,
pub continuation_token: Option<String>,
pub next_continuation_token: Option<String>,
}
#[async_trait]
impl FromS3Response for ListObjectsV2Response {
async fn from_s3response<'a>(
_req: S3Request<'a>,
resp: reqwest::Response,
) -> Result<Self, Error> {
let headers = resp.headers().clone();
let body = resp.bytes().await?;
let mut root = Element::parse(body.reader())?;
let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) =
parse_common_list_objects_response(&root)?;
let text = get_option_text(&root, "KeyCount");
let key_count = match text {
Some(v) => match v.is_empty() {
true => None,
false => Some(v.parse::<u16>()?),
},
None => None,
};
let start_after = url_decode(&encoding_type, get_option_text(&root, "StartAfter"))?;
let continuation_token = get_option_text(&root, "ContinuationToken");
let next_continuation_token = get_option_text(&root, "NextContinuationToken");
let mut contents: Vec<ListEntry> = Vec::new();
parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?;
parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?;
Ok(ListObjectsV2Response {
headers,
name,
encoding_type,
prefix,
delimiter,
is_truncated,
max_keys,
contents,
key_count,
start_after,
continuation_token,
next_continuation_token,
})
}
}
/// Response of [list_object_versions()](crate::s3::client::Client::list_object_versions) S3 API
#[derive(Clone, Debug)]
pub struct ListObjectVersionsResponse {
pub headers: HeaderMap,
pub name: String,
pub encoding_type: Option<String>,
pub prefix: Option<String>,
pub delimiter: Option<String>,
pub is_truncated: bool,
pub max_keys: Option<u16>,
pub contents: Vec<ListEntry>,
pub key_marker: Option<String>,
pub next_key_marker: Option<String>,
pub version_id_marker: Option<String>,
pub next_version_id_marker: Option<String>,
}
#[async_trait]
impl FromS3Response for ListObjectVersionsResponse {
async fn from_s3response<'a>(
_req: S3Request<'a>,
resp: reqwest::Response,
) -> Result<Self, Error> {
let headers = resp.headers().clone();
let body = resp.bytes().await?;
let mut root = Element::parse(body.reader())?;
let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) =
parse_common_list_objects_response(&root)?;
let key_marker = url_decode(&encoding_type, get_option_text(&root, "KeyMarker"))?;
let next_key_marker = url_decode(&encoding_type, get_option_text(&root, "NextKeyMarker"))?;
let version_id_marker = get_option_text(&root, "VersionIdMarker");
let next_version_id_marker = get_option_text(&root, "NextVersionIdMarker");
let mut contents: Vec<ListEntry> = Vec::new();
parse_list_objects_contents(&mut contents, &mut root, "Version", &encoding_type, false)?;
parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?;
parse_list_objects_contents(
&mut contents,
&mut root,
"DeleteMarker",
&encoding_type,
true,
)?;
Ok(ListObjectVersionsResponse {
headers,
name,
encoding_type,
prefix,
delimiter,
is_truncated,
max_keys,
contents,
key_marker,
next_key_marker,
version_id_marker,
next_version_id_marker,
})
}
}
/// Response of [list_objects()](crate::s3::client::Client::list_objects) API
#[derive(Clone, Debug, Default)]
pub struct ListObjectsResponse {
pub headers: HeaderMap,
pub name: String,
pub encoding_type: Option<String>,
pub prefix: Option<String>,
pub delimiter: Option<String>,
pub is_truncated: bool,
pub max_keys: Option<u16>,
pub contents: Vec<ListEntry>,
// ListObjectsV1
pub marker: Option<String>,
pub next_marker: Option<String>,
// ListObjectsV2
pub key_count: Option<u16>,
pub start_after: Option<String>,
pub continuation_token: Option<String>,
pub next_continuation_token: Option<String>,
// ListObjectVersions
pub key_marker: Option<String>,
pub next_key_marker: Option<String>,
pub version_id_marker: Option<String>,
pub next_version_id_marker: Option<String>,
}
impl From<ListObjectVersionsResponse> for ListObjectsResponse {
fn from(value: ListObjectVersionsResponse) -> Self {
ListObjectsResponse {
headers: value.headers,
name: value.name,
encoding_type: value.encoding_type,
prefix: value.prefix,
delimiter: value.delimiter,
is_truncated: value.is_truncated,
max_keys: value.max_keys,
contents: value.contents,
key_marker: value.key_marker,
next_key_marker: value.next_key_marker,
version_id_marker: value.version_id_marker,
next_version_id_marker: value.next_version_id_marker,
..Default::default()
}
}
}
impl From<ListObjectsV2Response> for ListObjectsResponse {
fn from(value: ListObjectsV2Response) -> Self {
ListObjectsResponse {
headers: value.headers,
name: value.name,
encoding_type: value.encoding_type,
prefix: value.prefix,
delimiter: value.delimiter,
is_truncated: value.is_truncated,
max_keys: value.max_keys,
contents: value.contents,
key_count: value.key_count,
start_after: value.start_after,
continuation_token: value.continuation_token,
next_continuation_token: value.next_continuation_token,
..Default::default()
}
}
}
impl From<ListObjectsV1Response> for ListObjectsResponse {
fn from(value: ListObjectsV1Response) -> Self {
ListObjectsResponse {
headers: value.headers,
name: value.name,
encoding_type: value.encoding_type,
prefix: value.prefix,
delimiter: value.delimiter,
is_truncated: value.is_truncated,
max_keys: value.max_keys,
contents: value.contents,
marker: value.marker,
next_marker: value.next_marker,
..Default::default()
}
}
}

View File

@ -15,14 +15,132 @@
//! Various types for S3 API requests and responses //! Various types for S3 API requests and responses
use super::client::Client;
use crate::s3::error::Error; use crate::s3::error::Error;
use crate::s3::utils::{ use crate::s3::utils::{
from_iso8601utc, get_default_text, get_option_text, get_text, to_iso8601utc, UtcTime, from_iso8601utc, get_default_text, get_option_text, get_text, to_iso8601utc, Multimap, UtcTime,
}; };
use async_trait::async_trait;
use futures_util::Stream;
use http::Method;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use xmltree::Element;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use xmltree::Element;
pub struct S3Request<'a> {
client: &'a Client,
method: Method,
region: Option<&'a str>,
bucket: Option<&'a str>,
object: Option<&'a str>,
query_params: Multimap,
headers: Multimap,
body: Option<Vec<u8>>,
// Computed region
inner_region: String,
}
impl<'a> S3Request<'a> {
pub fn new(client: &'a Client, method: Method) -> S3Request<'a> {
S3Request {
client,
method,
region: None,
bucket: None,
object: None,
query_params: Multimap::new(),
headers: Multimap::new(),
body: None,
inner_region: String::new(),
}
}
pub fn region(mut self, region: Option<&'a str>) -> Self {
self.region = region;
self
}
pub fn bucket(mut self, bucket: Option<&'a str>) -> Self {
self.bucket = bucket;
self
}
pub fn object(mut self, object: Option<&'a str>) -> Self {
self.object = object;
self
}
pub fn query_params(mut self, query_params: Multimap) -> Self {
self.query_params = query_params;
self
}
pub fn headers(mut self, headers: Multimap) -> Self {
self.headers = headers;
self
}
pub fn body(mut self, body: Option<Vec<u8>>) -> Self {
self.body = body;
self
}
pub async fn execute(&mut self) -> Result<reqwest::Response, Error> {
// Lookup the region of the bucket if provided.
self.inner_region = if let Some(bucket) = self.bucket {
self.client.get_region(bucket, self.region).await?
} else {
"us-east-1".to_string()
};
// Execute the API request.
self.client
.execute(
self.method.clone(),
&self.inner_region,
&mut self.headers,
&self.query_params,
self.bucket,
self.object,
self.body.as_ref().map(|x| x.as_slice()),
)
.await
}
}
pub trait ToS3Request {
fn to_s3request(&self) -> Result<S3Request, Error>;
}
#[async_trait]
pub trait FromS3Response: Sized {
async fn from_s3response<'a>(
s3req: S3Request<'a>,
resp: reqwest::Response,
) -> Result<Self, Error>;
}
#[async_trait]
pub trait S3Api: ToS3Request {
type S3Response: FromS3Response;
async fn send(&self) -> Result<Self::S3Response, Error> {
let mut req = self.to_s3request()?;
let resp = req.execute().await?;
Self::S3Response::from_s3response(req, resp).await
}
}
#[async_trait]
pub trait ToStream: Sized {
type Item;
async fn to_stream(self) -> Box<dyn Stream<Item = Result<Self::Item, Error>> + Unpin + Send>;
}
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
/// Contains information of an item of [list_objects()](crate::s3::client::Client::list_objects) API /// Contains information of an item of [list_objects()](crate::s3::client::Client::list_objects) API

View File

@ -16,7 +16,7 @@
use async_std::task; use async_std::task;
use chrono::Duration; use chrono::Duration;
use hyper::http::Method; use hyper::http::Method;
use minio::s3::types::NotificationRecords;
use rand::distributions::{Alphanumeric, DistString}; use rand::distributions::{Alphanumeric, DistString};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::collections::HashMap; use std::collections::HashMap;
@ -30,6 +30,8 @@ use minio::s3::args::*;
use minio::s3::client::Client; use minio::s3::client::Client;
use minio::s3::creds::StaticProvider; use minio::s3::creds::StaticProvider;
use minio::s3::http::BaseUrl; use minio::s3::http::BaseUrl;
use minio::s3::types::NotificationRecords;
use minio::s3::types::ToStream;
use minio::s3::types::{ use minio::s3::types::{
CsvInputSerialization, CsvOutputSerialization, DeleteObject, FileHeaderInfo, CsvInputSerialization, CsvOutputSerialization, DeleteObject, FileHeaderInfo,
NotificationConfig, ObjectLockConfig, PrefixFilterRule, QueueConfig, QuoteFields, NotificationConfig, ObjectLockConfig, PrefixFilterRule, QueueConfig, QuoteFields,
@ -423,15 +425,19 @@ impl ClientTest {
let mut stream = self let mut stream = self
.client .client
.list_objects(ListObjectsArgs::new(&self.test_bucket).unwrap()) .list_objects(&self.test_bucket)
.to_stream()
.await; .await;
let mut count = 0;
while let Some(items) = stream.next().await { while let Some(items) = stream.next().await {
let items = items.unwrap(); let items = items.unwrap().contents;
for item in items.iter() { for item in items.iter() {
assert!(names.contains(&item.name)); assert!(names.contains(&item.name));
count += 1;
} }
} }
assert!(count == 3);
let mut objects: Vec<DeleteObject> = Vec::new(); let mut objects: Vec<DeleteObject> = Vec::new();
for name in names.iter() { for name in names.iter() {