Add streaming APIs for list objects (#54)

This commit is contained in:
Aditya Manthramurthy 2023-10-04 21:16:44 -07:00 committed by GitHub
parent 17a6dead9c
commit 8fb211ae0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 718 additions and 576 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@
**/*.rs.bk
Cargo.lock
.idea
*.env

1
.rustfmt.toml Normal file
View File

@ -0,0 +1 @@
edition = "2021"

View File

@ -19,7 +19,7 @@ use crate::s3::error::Error;
use crate::s3::signer::post_presign_v4;
use crate::s3::sse::{Sse, SseCustomerKey};
use crate::s3::types::{
DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part,
DeleteObject, Directive, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part,
ReplicationConfig, Retention, RetentionMode, SelectRequest, SseConfig,
};
use crate::s3::utils::{
@ -987,19 +987,48 @@ impl<'a> RemoveObjectsArgs<'a> {
}
/// Argument for [list_objects_v1()](crate::s3::client::Client::list_objects_v1) S3 API
pub struct ListObjectsV1Args<'a> {
pub extra_headers: Option<&'a Multimap>,
pub extra_query_params: Option<&'a Multimap>,
pub region: Option<&'a str>,
pub bucket: &'a str,
pub delimiter: Option<&'a str>,
pub encoding_type: Option<&'a str>,
#[derive(Clone, Debug)]
pub struct ListObjectsV1Args {
pub extra_headers: Option<Multimap>,
pub extra_query_params: Option<Multimap>,
pub region: Option<String>,
pub bucket: String,
pub delimiter: Option<String>,
pub use_url_encoding_type: bool,
pub max_keys: Option<u16>,
pub prefix: Option<&'a str>,
pub prefix: Option<String>,
pub marker: Option<String>,
}
impl<'a> ListObjectsV1Args<'a> {
// Helper function delimiter based on recursive flag when delimiter is not
// provided.
fn delim_helper(delim: Option<String>, recursive: bool) -> Option<String> {
if delim.is_some() {
return delim;
}
match recursive {
true => None,
false => Some(String::from("/")),
}
}
impl From<ListObjectsArgs> for ListObjectsV1Args {
fn from(value: ListObjectsArgs) -> Self {
ListObjectsV1Args {
extra_headers: value.extra_headers,
extra_query_params: value.extra_query_params,
region: value.region,
bucket: value.bucket,
delimiter: delim_helper(value.delimiter, value.recursive),
use_url_encoding_type: value.use_url_encoding_type,
max_keys: value.max_keys,
prefix: value.prefix,
marker: value.marker,
}
}
}
impl ListObjectsV1Args {
/// Returns argument for [list_objects_v1()](crate::s3::client::Client::list_objects_v1) S3 API with given bucket name
///
/// # Examples
@ -1008,16 +1037,16 @@ impl<'a> ListObjectsV1Args<'a> {
/// use minio::s3::args::*;
/// let args = ListObjectsV1Args::new("my-bucket").unwrap();
/// ```
pub fn new(bucket_name: &'a str) -> Result<ListObjectsV1Args<'a>, Error> {
pub fn new(bucket_name: &str) -> Result<ListObjectsV1Args, Error> {
check_bucket_name(bucket_name, true)?;
Ok(ListObjectsV1Args {
extra_headers: None,
extra_query_params: None,
region: None,
bucket: bucket_name,
bucket: bucket_name.to_owned(),
delimiter: None,
encoding_type: None,
use_url_encoding_type: true,
max_keys: None,
prefix: None,
marker: None,
@ -1026,22 +1055,42 @@ impl<'a> ListObjectsV1Args<'a> {
}
/// Argument for [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API
pub struct ListObjectsV2Args<'a> {
pub extra_headers: Option<&'a Multimap>,
pub extra_query_params: Option<&'a Multimap>,
pub region: Option<&'a str>,
pub bucket: &'a str,
pub delimiter: Option<&'a str>,
pub encoding_type: Option<&'a str>,
#[derive(Clone, Debug)]
pub struct ListObjectsV2Args {
pub extra_headers: Option<Multimap>,
pub extra_query_params: Option<Multimap>,
pub region: Option<String>,
pub bucket: String,
pub delimiter: Option<String>,
pub use_url_encoding_type: bool,
pub max_keys: Option<u16>,
pub prefix: Option<&'a str>,
pub prefix: Option<String>,
pub start_after: Option<String>,
pub continuation_token: Option<String>,
pub fetch_owner: bool,
pub include_user_metadata: bool,
}
impl<'a> ListObjectsV2Args<'a> {
impl From<ListObjectsArgs> for ListObjectsV2Args {
fn from(value: ListObjectsArgs) -> Self {
ListObjectsV2Args {
extra_headers: value.extra_headers,
extra_query_params: value.extra_query_params,
region: value.region,
bucket: value.bucket,
delimiter: delim_helper(value.delimiter, value.recursive),
use_url_encoding_type: value.use_url_encoding_type,
max_keys: value.max_keys,
prefix: value.prefix,
start_after: value.start_after,
continuation_token: value.continuation_token,
fetch_owner: value.fetch_owner,
include_user_metadata: value.include_user_metadata,
}
}
}
impl ListObjectsV2Args {
/// Returns argument for [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API with given bucket name
///
/// # Examples
@ -1050,16 +1099,15 @@ impl<'a> ListObjectsV2Args<'a> {
/// use minio::s3::args::*;
/// let args = ListObjectsV2Args::new("my-bucket").unwrap();
/// ```
pub fn new(bucket_name: &'a str) -> Result<ListObjectsV2Args<'a>, Error> {
pub fn new(bucket_name: &str) -> Result<ListObjectsV2Args, Error> {
check_bucket_name(bucket_name, true)?;
Ok(ListObjectsV2Args {
extra_headers: None,
extra_query_params: None,
region: None,
bucket: bucket_name,
bucket: bucket_name.to_owned(),
delimiter: None,
encoding_type: None,
use_url_encoding_type: true,
max_keys: None,
prefix: None,
start_after: None,
@ -1071,20 +1119,37 @@ impl<'a> ListObjectsV2Args<'a> {
}
/// Argument for [list_object_versions()](crate::s3::client::Client::list_object_versions) S3 API
pub struct ListObjectVersionsArgs<'a> {
pub extra_headers: Option<&'a Multimap>,
pub extra_query_params: Option<&'a Multimap>,
pub region: Option<&'a str>,
pub bucket: &'a str,
pub delimiter: Option<&'a str>,
pub encoding_type: Option<&'a str>,
pub struct ListObjectVersionsArgs {
pub extra_headers: Option<Multimap>,
pub extra_query_params: Option<Multimap>,
pub region: Option<String>,
pub bucket: String,
pub delimiter: Option<String>,
pub use_url_encoding_type: bool,
pub max_keys: Option<u16>,
pub prefix: Option<&'a str>,
pub prefix: Option<String>,
pub key_marker: Option<String>,
pub version_id_marker: Option<String>,
}
impl<'a> ListObjectVersionsArgs<'a> {
impl From<ListObjectsArgs> for ListObjectVersionsArgs {
fn from(value: ListObjectsArgs) -> Self {
ListObjectVersionsArgs {
extra_headers: value.extra_headers,
extra_query_params: value.extra_query_params,
region: value.region,
bucket: value.bucket,
delimiter: delim_helper(value.delimiter, value.recursive),
use_url_encoding_type: value.use_url_encoding_type,
max_keys: value.max_keys,
prefix: value.prefix,
key_marker: value.key_marker,
version_id_marker: value.version_id_marker,
}
}
}
impl ListObjectVersionsArgs {
/// Returns argument for [list_object_versions()](crate::s3::client::Client::list_object_versions) S3 API with given bucket name
///
/// # Examples
@ -1093,16 +1158,16 @@ impl<'a> ListObjectVersionsArgs<'a> {
/// use minio::s3::args::*;
/// let args = ListObjectVersionsArgs::new("my-bucket").unwrap();
/// ```
pub fn new(bucket_name: &'a str) -> Result<ListObjectVersionsArgs<'a>, Error> {
pub fn new(bucket_name: &str) -> Result<ListObjectVersionsArgs, Error> {
check_bucket_name(bucket_name, true)?;
Ok(ListObjectVersionsArgs {
extra_headers: None,
extra_query_params: None,
region: None,
bucket: bucket_name,
bucket: bucket_name.to_owned(),
delimiter: None,
encoding_type: None,
use_url_encoding_type: true,
max_keys: None,
prefix: None,
key_marker: None,
@ -1112,68 +1177,62 @@ impl<'a> ListObjectVersionsArgs<'a> {
}
/// Argument for [list_objects()](crate::s3::client::Client::list_objects) API
pub struct ListObjectsArgs<'a> {
pub extra_headers: Option<&'a Multimap>,
pub extra_query_params: Option<&'a Multimap>,
pub region: Option<&'a str>,
#[derive(Clone, Debug)]
pub struct ListObjectsArgs {
pub extra_headers: Option<Multimap>,
pub extra_query_params: Option<Multimap>,
pub region: Option<String>,
/// Specifies the bucket name on which listing is to be performed.
pub bucket: &'a str,
pub bucket: String,
/// Delimiter to roll up common prefixes on.
pub delimiter: Option<&'a str>,
pub delimiter: Option<String>,
pub use_url_encoding_type: bool,
/// Used only with ListObjectsV1.
pub marker: Option<&'a str>,
/// Used only with ListObjectsV2
pub start_after: Option<&'a str>,
/// Used only with GetObjectVersions.
pub key_marker: Option<&'a str>,
pub max_keys: Option<u16>,
pub prefix: Option<&'a str>,
pub prefix: Option<String>,
/// Used only with ListObjectsV1.
pub marker: Option<String>,
/// Used only with ListObjectsV2
pub start_after: Option<String>,
/// Used only with ListObjectsV2.
pub continuation_token: Option<&'a str>,
pub continuation_token: Option<String>,
/// Used only with ListObjectsV2.
pub fetch_owner: bool,
/// Used only with GetObjectVersions.
pub version_id_marker: Option<&'a str>,
/// MinIO extension for ListObjectsV2.
pub include_user_metadata: bool,
/// Used only with GetObjectVersions.
pub key_marker: Option<String>,
/// Used only with GetObjectVersions.
pub version_id_marker: Option<String>,
/// This parameter takes effect only when delimiter is None. Enables
/// recursive traversal for listing of the bucket and prefix.
pub recursive: bool,
/// Set this to use ListObjectsV1. Defaults to false.
pub use_api_v1: bool,
/// Set this to include versions.
pub include_versions: bool,
/// A callback function to process results of object listing.
pub result_fn: &'a dyn Fn(Vec<Item>) -> bool,
}
impl<'a> ListObjectsArgs<'a> {
impl ListObjectsArgs {
/// Returns argument for [list_objects()](crate::s3::client::Client::list_objects) API with given bucket name and callback function for results.
///
/// # Examples
///
/// ```
/// use minio::s3::args::*;
/// let args = ListObjectsArgs::new(
/// "my-bucket",
/// &|items| {
/// for item in items.iter() {
/// println!("{:?}", item.name);
/// }
/// true
/// },
/// ).unwrap();
/// let args = ListObjectsArgs::new("my-bucket").unwrap();
/// ```
pub fn new(
bucket_name: &'a str,
result_fn: &'a dyn Fn(Vec<Item>) -> bool,
) -> Result<ListObjectsArgs<'a>, Error> {
pub fn new(bucket_name: &str) -> Result<ListObjectsArgs, Error> {
check_bucket_name(bucket_name, true)?;
Ok(ListObjectsArgs {
extra_headers: None,
extra_query_params: None,
region: None,
bucket: bucket_name,
bucket: bucket_name.to_owned(),
delimiter: None,
use_url_encoding_type: true,
marker: None,
@ -1188,7 +1247,6 @@ impl<'a> ListObjectsArgs<'a> {
recursive: false,
use_api_v1: false,
include_versions: false,
result_fn,
})
}
}

View File

@ -23,13 +23,14 @@ use crate::s3::response::*;
use crate::s3::signer::{presign_v4, sign_v4_s3};
use crate::s3::sse::SseCustomerKey;
use crate::s3::types::{
Bucket, DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig,
Part, ReplicationConfig, RetentionMode, SseConfig,
Bucket, DeleteObject, Directive, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part,
ReplicationConfig, RetentionMode, SseConfig,
};
use crate::s3::utils::{
from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, merge, sha256_hash,
to_amz_date, to_iso8601utc, urldecode, utc_now, Multimap,
to_amz_date, to_iso8601utc, utc_now, Multimap,
};
use async_recursion::async_recursion;
use bytes::{Buf, Bytes};
use dashmap::DashMap;
@ -44,170 +45,12 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use xmltree::Element;
mod list_objects;
mod listen_bucket_notification;
pub use list_objects::*;
pub use listen_bucket_notification::*;
fn url_decode(
encoding_type: &Option<String>,
prefix: Option<String>,
) -> Result<Option<String>, Error> {
if let Some(v) = encoding_type.as_ref() {
if v == "url" {
if let Some(v) = prefix {
return Ok(Some(urldecode(&v)?.to_string()));
}
}
}
if let Some(v) = prefix.as_ref() {
return Ok(Some(v.to_string()));
}
Ok(None)
}
fn add_common_list_objects_query_params(
query_params: &mut Multimap,
delimiter: Option<&str>,
encoding_type: Option<&str>,
max_keys: Option<u16>,
prefix: Option<&str>,
) {
query_params.insert(
String::from("delimiter"),
delimiter.unwrap_or("").to_string(),
);
query_params.insert(
String::from("max-keys"),
max_keys.unwrap_or(1000).to_string(),
);
query_params.insert(String::from("prefix"), prefix.unwrap_or("").to_string());
if let Some(v) = encoding_type {
query_params.insert(String::from("encoding-type"), v.to_string());
}
}
fn parse_common_list_objects_response(
root: &Element,
) -> Result<
(
String,
Option<String>,
Option<String>,
Option<String>,
bool,
Option<u16>,
),
Error,
> {
let encoding_type = get_option_text(root, "EncodingType");
let prefix = url_decode(&encoding_type, Some(get_default_text(root, "Prefix")))?;
Ok((
get_text(root, "Name")?,
encoding_type,
prefix,
get_option_text(root, "Delimiter"),
match get_option_text(root, "IsTruncated") {
Some(v) => v.to_lowercase() == "true",
None => false,
},
match get_option_text(root, "MaxKeys") {
Some(v) => Some(v.parse::<u16>()?),
None => None,
},
))
}
fn parse_list_objects_contents(
contents: &mut Vec<Item>,
root: &mut xmltree::Element,
tag: &str,
encoding_type: &Option<String>,
is_delete_marker: bool,
) -> Result<(), Error> {
while let Some(v) = root.take_child(tag) {
let content = v;
let etype = encoding_type.as_ref().cloned();
let key = url_decode(&etype, Some(get_text(&content, "Key")?))?.unwrap();
let last_modified = Some(from_iso8601utc(&get_text(&content, "LastModified")?)?);
let etag = get_option_text(&content, "ETag");
let v = get_default_text(&content, "Size");
let size = match v.is_empty() {
true => None,
false => Some(v.parse::<usize>()?),
};
let storage_class = get_option_text(&content, "StorageClass");
let is_latest = get_default_text(&content, "IsLatest").to_lowercase() == "true";
let version_id = get_option_text(&content, "VersionId");
let (owner_id, owner_name) = match content.get_child("Owner") {
Some(v) => (get_option_text(v, "ID"), get_option_text(v, "DisplayName")),
None => (None, None),
};
let user_metadata = match content.get_child("UserMetadata") {
Some(v) => {
let mut map: HashMap<String, String> = HashMap::new();
for xml_node in &v.children {
let u = xml_node
.as_element()
.ok_or(Error::XmlError("unable to convert to element".to_string()))?;
map.insert(
u.name.to_string(),
u.get_text().unwrap_or_default().to_string(),
);
}
Some(map)
}
None => None,
};
contents.push(Item {
name: key,
last_modified,
etag,
owner_id,
owner_name,
size,
storage_class,
is_latest,
version_id,
user_metadata,
is_prefix: false,
is_delete_marker,
encoding_type: etype,
});
}
Ok(())
}
fn parse_list_objects_common_prefixes(
contents: &mut Vec<Item>,
root: &mut Element,
encoding_type: &Option<String>,
) -> Result<(), Error> {
while let Some(v) = root.take_child("CommonPrefixes") {
let common_prefix = v;
contents.push(Item {
name: url_decode(encoding_type, Some(get_text(&common_prefix, "Prefix")?))?.unwrap(),
last_modified: None,
etag: None,
owner_id: None,
owner_name: None,
size: None,
storage_class: None,
is_latest: false,
version_id: None,
user_metadata: None,
is_prefix: true,
is_delete_marker: false,
encoding_type: encoding_type.as_ref().cloned(),
});
}
Ok(())
}
/// Client Builder manufactures a Client using given parameters.
#[derive(Debug, Default)]
pub struct ClientBuilder {
@ -315,7 +158,7 @@ impl Client {
/// use minio::s3::client::Client;
/// use minio::s3::creds::StaticProvider;
/// use minio::s3::http::BaseUrl;
/// let mut base_url: BaseUrl = "play.min.io".parse().unwrap();
/// let base_url: BaseUrl = "play.min.io".parse().unwrap();
/// let static_provider = StaticProvider::new(
/// "Q3AM3UQ867SPQQA43P2F",
/// "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
@ -2433,327 +2276,6 @@ impl Client {
})
}
/// Executes [ListObjects](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html) S3 API
pub async fn list_objects_v1(
&self,
args: &ListObjectsV1Args<'_>,
) -> Result<ListObjectsV1Response, Error> {
let region = self.get_region(args.bucket, args.region).await?;
let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
add_common_list_objects_query_params(
&mut query_params,
args.delimiter,
args.encoding_type,
args.max_keys,
args.prefix,
);
if let Some(v) = &args.marker {
query_params.insert(String::from("marker"), v.to_string());
}
let resp = self
.execute(
Method::GET,
&region,
&mut headers,
&query_params,
Some(args.bucket),
None,
None,
)
.await?;
let header_map = resp.headers().clone();
let body = resp.bytes().await?;
let mut root = Element::parse(body.reader())?;
let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) =
parse_common_list_objects_response(&root)?;
let marker = url_decode(&encoding_type, get_option_text(&root, "Marker"))?;
let mut next_marker = url_decode(&encoding_type, get_option_text(&root, "NextMarker"))?;
let mut contents: Vec<Item> = Vec::new();
parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?;
if is_truncated && next_marker.is_none() {
next_marker = contents.last().map(|v| v.name.clone())
}
parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?;
Ok(ListObjectsV1Response {
headers: header_map,
name,
encoding_type,
prefix,
delimiter,
is_truncated,
max_keys,
contents,
marker,
next_marker,
})
}
/// Executes [ListObjectsV2](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html) S3 API
pub async fn list_objects_v2(
&self,
args: &ListObjectsV2Args<'_>,
) -> Result<ListObjectsV2Response, Error> {
let region = self.get_region(args.bucket, args.region).await?;
let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
query_params.insert(String::from("list-type"), String::from("2"));
add_common_list_objects_query_params(
&mut query_params,
args.delimiter,
args.encoding_type,
args.max_keys,
args.prefix,
);
if let Some(v) = &args.continuation_token {
query_params.insert(String::from("continuation-token"), v.to_string());
}
if args.fetch_owner {
query_params.insert(String::from("fetch-owner"), String::from("true"));
}
if let Some(v) = &args.start_after {
query_params.insert(String::from("start-after"), v.to_string());
}
if args.include_user_metadata {
query_params.insert(String::from("metadata"), String::from("true"));
}
let resp = self
.execute(
Method::GET,
&region,
&mut headers,
&query_params,
Some(args.bucket),
None,
None,
)
.await?;
let header_map = resp.headers().clone();
let body = resp.bytes().await?;
let mut root = Element::parse(body.reader())?;
let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) =
parse_common_list_objects_response(&root)?;
let text = get_option_text(&root, "KeyCount");
let key_count = match text {
Some(v) => match v.is_empty() {
true => None,
false => Some(v.parse::<u16>()?),
},
None => None,
};
let start_after = url_decode(&encoding_type, get_option_text(&root, "StartAfter"))?;
let continuation_token = get_option_text(&root, "ContinuationToken");
let next_continuation_token = get_option_text(&root, "NextContinuationToken");
let mut contents: Vec<Item> = Vec::new();
parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?;
parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?;
Ok(ListObjectsV2Response {
headers: header_map,
name,
encoding_type,
prefix,
delimiter,
is_truncated,
max_keys,
contents,
key_count,
start_after,
continuation_token,
next_continuation_token,
})
}
/// Executes [ListObjectVersions](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html) S3 API
pub async fn list_object_versions(
&self,
args: &ListObjectVersionsArgs<'_>,
) -> Result<ListObjectVersionsResponse, Error> {
let region = self.get_region(args.bucket, args.region).await?;
let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
query_params.insert(String::from("versions"), String::new());
add_common_list_objects_query_params(
&mut query_params,
args.delimiter,
args.encoding_type,
args.max_keys,
args.prefix,
);
if let Some(v) = &args.key_marker {
query_params.insert(String::from("key-marker"), v.to_string());
}
if let Some(v) = &args.version_id_marker {
query_params.insert(String::from("version-id-marker"), v.to_string());
}
let resp = self
.execute(
Method::GET,
&region,
&mut headers,
&query_params,
Some(args.bucket),
None,
None,
)
.await?;
let header_map = resp.headers().clone();
let body = resp.bytes().await?;
let mut root = Element::parse(body.reader())?;
let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) =
parse_common_list_objects_response(&root)?;
let key_marker = url_decode(&encoding_type, get_option_text(&root, "KeyMarker"))?;
let next_key_marker = url_decode(&encoding_type, get_option_text(&root, "NextKeyMarker"))?;
let version_id_marker = get_option_text(&root, "VersionIdMarker");
let next_version_id_marker = get_option_text(&root, "NextVersionIdMarker");
let mut contents: Vec<Item> = Vec::new();
parse_list_objects_contents(&mut contents, &mut root, "Version", &encoding_type, false)?;
parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?;
parse_list_objects_contents(
&mut contents,
&mut root,
"DeleteMarker",
&encoding_type,
true,
)?;
Ok(ListObjectVersionsResponse {
headers: header_map,
name,
encoding_type,
prefix,
delimiter,
is_truncated,
max_keys,
contents,
key_marker,
next_key_marker,
version_id_marker,
next_version_id_marker,
})
}
/// List objects with version information optionally. `results_fn` callback
/// function is repeatedly called with object information and returning
/// false from the callback stops further listing.
pub async fn list_objects(&self, args: &ListObjectsArgs<'_>) -> Result<(), Error> {
let mut lov1_args = ListObjectsV1Args::new(args.bucket)?;
lov1_args.extra_headers = args.extra_headers;
lov1_args.extra_query_params = args.extra_query_params;
lov1_args.region = args.region;
if args.recursive {
lov1_args.delimiter = None;
} else {
lov1_args.delimiter = Some(args.delimiter.unwrap_or("/"));
}
lov1_args.encoding_type = match args.use_url_encoding_type {
true => Some("url"),
false => None,
};
lov1_args.max_keys = args.max_keys;
lov1_args.prefix = args.prefix;
lov1_args.marker = args.marker.map(|x| x.to_string());
let mut lov2_args = ListObjectsV2Args::new(args.bucket)?;
lov2_args.extra_headers = args.extra_headers;
lov2_args.extra_query_params = args.extra_query_params;
lov2_args.region = args.region;
if args.recursive {
lov2_args.delimiter = None;
} else {
lov2_args.delimiter = Some(args.delimiter.unwrap_or("/"));
}
lov2_args.encoding_type = match args.use_url_encoding_type {
true => Some("url"),
false => None,
};
lov2_args.max_keys = args.max_keys;
lov2_args.prefix = args.prefix;
lov2_args.start_after = args.start_after.map(|x| x.to_string());
lov2_args.continuation_token = args.continuation_token.map(|x| x.to_string());
lov2_args.fetch_owner = args.fetch_owner;
lov2_args.include_user_metadata = args.include_user_metadata;
let mut lov_args = ListObjectVersionsArgs::new(args.bucket)?;
lov_args.extra_headers = args.extra_headers;
lov_args.extra_query_params = args.extra_query_params;
lov_args.region = args.region;
if args.recursive {
lov_args.delimiter = None;
} else {
lov_args.delimiter = Some(args.delimiter.unwrap_or("/"));
}
lov_args.encoding_type = match args.use_url_encoding_type {
true => Some("url"),
false => None,
};
lov_args.max_keys = args.max_keys;
lov_args.prefix = args.prefix;
lov_args.key_marker = args.key_marker.map(|x| x.to_string());
lov_args.version_id_marker = args.version_id_marker.map(|x| x.to_string());
let mut stop = false;
while !stop {
if args.include_versions {
let resp = self.list_object_versions(&lov_args).await?;
stop = !resp.is_truncated;
if resp.is_truncated {
lov_args.key_marker = resp.next_key_marker;
lov_args.version_id_marker = resp.next_version_id_marker;
}
stop = stop || !(args.result_fn)(resp.contents);
} else if args.use_api_v1 {
let resp = self.list_objects_v1(&lov1_args).await?;
stop = !resp.is_truncated;
if resp.is_truncated {
lov1_args.marker = resp.next_marker;
}
stop = stop || !(args.result_fn)(resp.contents);
} else {
let resp = self.list_objects_v2(&lov2_args).await?;
stop = !resp.is_truncated;
if resp.is_truncated {
lov2_args.start_after = resp.start_after;
lov2_args.continuation_token = resp.next_continuation_token;
}
stop = stop || !(args.result_fn)(resp.contents);
}
}
Ok(())
}
pub async fn make_bucket(
&self,
args: &MakeBucketArgs<'_>,

View File

@ -0,0 +1,561 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2023 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! S3 API: ListObjectsV2, ListObjectsV1, ListObjectVersions and streaming helper.
use std::collections::HashMap;
use super::Client;
use crate::s3::{
args::{ListObjectVersionsArgs, ListObjectsArgs, ListObjectsV1Args, ListObjectsV2Args},
error::Error,
response::{ListObjectVersionsResponse, ListObjectsV1Response, ListObjectsV2Response},
types::ListEntry,
utils::{
from_iso8601utc, get_default_text, get_option_text, get_text, merge, urldecode, Multimap,
},
};
use bytes::Buf;
use futures_util::{stream as futures_stream, Stream, StreamExt};
use http::Method;
use xmltree::Element;
fn url_decode(
encoding_type: &Option<String>,
prefix: Option<String>,
) -> Result<Option<String>, Error> {
if let Some(v) = encoding_type.as_ref() {
if v == "url" {
if let Some(v) = prefix {
return Ok(Some(urldecode(&v)?.to_string()));
}
}
}
if let Some(v) = prefix.as_ref() {
return Ok(Some(v.to_string()));
}
Ok(None)
}
fn add_common_list_objects_query_params(
query_params: &mut Multimap,
delimiter: Option<&str>,
use_url_encoding_type: bool,
max_keys: Option<u16>,
prefix: Option<&str>,
) {
query_params.insert(
String::from("delimiter"),
delimiter.unwrap_or("").to_string(),
);
query_params.insert(
String::from("max-keys"),
max_keys.unwrap_or(1000).to_string(),
);
query_params.insert(String::from("prefix"), prefix.unwrap_or("").to_string());
if use_url_encoding_type {
query_params.insert(String::from("encoding-type"), String::from("url"));
}
}
fn parse_common_list_objects_response(
root: &Element,
) -> Result<
(
String,
Option<String>,
Option<String>,
Option<String>,
bool,
Option<u16>,
),
Error,
> {
let encoding_type = get_option_text(root, "EncodingType");
let prefix = url_decode(&encoding_type, Some(get_default_text(root, "Prefix")))?;
Ok((
get_text(root, "Name")?,
encoding_type,
prefix,
get_option_text(root, "Delimiter"),
match get_option_text(root, "IsTruncated") {
Some(v) => v.to_lowercase() == "true",
None => false,
},
match get_option_text(root, "MaxKeys") {
Some(v) => Some(v.parse::<u16>()?),
None => None,
},
))
}
fn parse_list_objects_contents(
contents: &mut Vec<ListEntry>,
root: &mut xmltree::Element,
tag: &str,
encoding_type: &Option<String>,
is_delete_marker: bool,
) -> Result<(), Error> {
while let Some(v) = root.take_child(tag) {
let content = v;
let etype = encoding_type.as_ref().cloned();
let key = url_decode(&etype, Some(get_text(&content, "Key")?))?.unwrap();
let last_modified = Some(from_iso8601utc(&get_text(&content, "LastModified")?)?);
let etag = get_option_text(&content, "ETag");
let v = get_default_text(&content, "Size");
let size = match v.is_empty() {
true => None,
false => Some(v.parse::<usize>()?),
};
let storage_class = get_option_text(&content, "StorageClass");
let is_latest = get_default_text(&content, "IsLatest").to_lowercase() == "true";
let version_id = get_option_text(&content, "VersionId");
let (owner_id, owner_name) = match content.get_child("Owner") {
Some(v) => (get_option_text(v, "ID"), get_option_text(v, "DisplayName")),
None => (None, None),
};
let user_metadata = match content.get_child("UserMetadata") {
Some(v) => {
let mut map: HashMap<String, String> = HashMap::new();
for xml_node in &v.children {
let u = xml_node
.as_element()
.ok_or(Error::XmlError("unable to convert to element".to_string()))?;
map.insert(
u.name.to_string(),
u.get_text().unwrap_or_default().to_string(),
);
}
Some(map)
}
None => None,
};
contents.push(ListEntry {
name: key,
last_modified,
etag,
owner_id,
owner_name,
size,
storage_class,
is_latest,
version_id,
user_metadata,
is_prefix: false,
is_delete_marker,
encoding_type: etype,
});
}
Ok(())
}
fn parse_list_objects_common_prefixes(
contents: &mut Vec<ListEntry>,
root: &mut Element,
encoding_type: &Option<String>,
) -> Result<(), Error> {
while let Some(v) = root.take_child("CommonPrefixes") {
let common_prefix = v;
contents.push(ListEntry {
name: url_decode(encoding_type, Some(get_text(&common_prefix, "Prefix")?))?.unwrap(),
last_modified: None,
etag: None,
owner_id: None,
owner_name: None,
size: None,
storage_class: None,
is_latest: false,
version_id: None,
user_metadata: None,
is_prefix: true,
is_delete_marker: false,
encoding_type: encoding_type.as_ref().cloned(),
});
}
Ok(())
}
impl Client {
/// Executes [ListObjects](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html) S3 API
pub async fn list_objects_v1(
&self,
args: &ListObjectsV1Args,
) -> Result<ListObjectsV1Response, Error> {
let region = self
.get_region(&args.bucket, args.region.as_deref())
.await?;
let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
add_common_list_objects_query_params(
&mut query_params,
args.delimiter.as_deref(),
args.use_url_encoding_type,
args.max_keys,
args.prefix.as_deref(),
);
if let Some(v) = &args.marker {
query_params.insert(String::from("marker"), v.to_string());
}
let resp = self
.execute(
Method::GET,
&region,
&mut headers,
&query_params,
Some(&args.bucket),
None,
None,
)
.await?;
let header_map = resp.headers().clone();
let body = resp.bytes().await?;
let mut root = Element::parse(body.reader())?;
let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) =
parse_common_list_objects_response(&root)?;
let marker = url_decode(&encoding_type, get_option_text(&root, "Marker"))?;
let mut next_marker = url_decode(&encoding_type, get_option_text(&root, "NextMarker"))?;
let mut contents: Vec<ListEntry> = Vec::new();
parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?;
if is_truncated && next_marker.is_none() {
next_marker = contents.last().map(|v| v.name.clone())
}
parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?;
Ok(ListObjectsV1Response {
headers: header_map,
name,
encoding_type,
prefix,
delimiter,
is_truncated,
max_keys,
contents,
marker,
next_marker,
})
}
/// Executes [ListObjectsV2](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html) S3 API
pub async fn list_objects_v2(
&self,
args: &ListObjectsV2Args,
) -> Result<ListObjectsV2Response, Error> {
let region = self
.get_region(&args.bucket, args.region.as_deref())
.await?;
let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
query_params.insert(String::from("list-type"), String::from("2"));
add_common_list_objects_query_params(
&mut query_params,
args.delimiter.as_deref(),
args.use_url_encoding_type,
args.max_keys,
args.prefix.as_deref(),
);
if let Some(v) = &args.continuation_token {
query_params.insert(String::from("continuation-token"), v.to_string());
}
if args.fetch_owner {
query_params.insert(String::from("fetch-owner"), String::from("true"));
}
if let Some(v) = &args.start_after {
query_params.insert(String::from("start-after"), v.to_string());
}
if args.include_user_metadata {
query_params.insert(String::from("metadata"), String::from("true"));
}
let resp = self
.execute(
Method::GET,
&region,
&mut headers,
&query_params,
Some(&args.bucket),
None,
None,
)
.await?;
let header_map = resp.headers().clone();
let body = resp.bytes().await?;
let mut root = Element::parse(body.reader())?;
let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) =
parse_common_list_objects_response(&root)?;
let text = get_option_text(&root, "KeyCount");
let key_count = match text {
Some(v) => match v.is_empty() {
true => None,
false => Some(v.parse::<u16>()?),
},
None => None,
};
let start_after = url_decode(&encoding_type, get_option_text(&root, "StartAfter"))?;
let continuation_token = get_option_text(&root, "ContinuationToken");
let next_continuation_token = get_option_text(&root, "NextContinuationToken");
let mut contents: Vec<ListEntry> = Vec::new();
parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?;
parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?;
Ok(ListObjectsV2Response {
headers: header_map,
name,
encoding_type,
prefix,
delimiter,
is_truncated,
max_keys,
contents,
key_count,
start_after,
continuation_token,
next_continuation_token,
})
}
/// Executes [ListObjectVersions](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html) S3 API
pub async fn list_object_versions(
&self,
args: &ListObjectVersionsArgs,
) -> Result<ListObjectVersionsResponse, Error> {
let region = self
.get_region(&args.bucket, args.region.as_deref())
.await?;
let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
query_params.insert(String::from("versions"), String::new());
add_common_list_objects_query_params(
&mut query_params,
args.delimiter.as_deref(),
args.use_url_encoding_type,
args.max_keys,
args.prefix.as_deref(),
);
if let Some(v) = &args.key_marker {
query_params.insert(String::from("key-marker"), v.to_string());
}
if let Some(v) = &args.version_id_marker {
query_params.insert(String::from("version-id-marker"), v.to_string());
}
let resp = self
.execute(
Method::GET,
&region,
&mut headers,
&query_params,
Some(&args.bucket),
None,
None,
)
.await?;
let header_map = resp.headers().clone();
let body = resp.bytes().await?;
let mut root = Element::parse(body.reader())?;
let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) =
parse_common_list_objects_response(&root)?;
let key_marker = url_decode(&encoding_type, get_option_text(&root, "KeyMarker"))?;
let next_key_marker = url_decode(&encoding_type, get_option_text(&root, "NextKeyMarker"))?;
let version_id_marker = get_option_text(&root, "VersionIdMarker");
let next_version_id_marker = get_option_text(&root, "NextVersionIdMarker");
let mut contents: Vec<ListEntry> = Vec::new();
parse_list_objects_contents(&mut contents, &mut root, "Version", &encoding_type, false)?;
parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?;
parse_list_objects_contents(
&mut contents,
&mut root,
"DeleteMarker",
&encoding_type,
true,
)?;
Ok(ListObjectVersionsResponse {
headers: header_map,
name,
encoding_type,
prefix,
delimiter,
is_truncated,
max_keys,
contents,
key_marker,
next_key_marker,
version_id_marker,
next_version_id_marker,
})
}
async fn list_objects_v1_stream(
&self,
args: ListObjectsV1Args,
) -> impl Stream<Item = Result<ListObjectsV1Response, Error>> + Unpin {
Box::pin(futures_stream::unfold(
(self.clone(), args),
move |(client, mut args)| async move {
let resp = client.list_objects_v1(&args).await;
match resp {
Ok(resp) => {
if !resp.is_truncated {
None
} else {
args.marker = resp.next_marker.clone();
Some((Ok(resp), (client, args)))
}
}
Err(e) => Some((Err(e), (client, args))),
}
},
))
}
async fn list_objects_v2_stream(
&self,
args: ListObjectsV2Args,
) -> impl Stream<Item = Result<ListObjectsV2Response, Error>> + Unpin {
Box::pin(futures_stream::unfold(
(self.clone(), args),
move |(client, mut args)| async move {
let resp = client.list_objects_v2(&args).await;
match resp {
Ok(resp) => {
if !resp.is_truncated {
None
} else {
args.continuation_token = resp.next_continuation_token.clone();
Some((Ok(resp), (client, args)))
}
}
Err(e) => Some((Err(e), (client, args))),
}
},
))
}
async fn list_object_versions_stream(
&self,
args: ListObjectVersionsArgs,
) -> impl Stream<Item = Result<ListObjectVersionsResponse, Error>> + Unpin {
Box::pin(futures_stream::unfold(
(self.clone(), args),
move |(client, mut args)| async move {
let resp = client.list_object_versions(&args).await;
match resp {
Ok(resp) => {
if !resp.is_truncated {
None
} else {
args.key_marker = resp.next_key_marker.clone();
args.version_id_marker = resp.next_version_id_marker.clone();
Some((Ok(resp), (client, args)))
}
}
Err(e) => Some((Err(e), (client, args))),
}
},
))
}
/// List objects with version information optionally. This function handles
/// pagination and returns a stream of results. Each result corresponds to
/// the response of a single listing API call.
///
/// # Example
///
/// ```rust,no_run
/// use minio::s3::client::{Client, ClientBuilder};
/// use minio::s3::creds::StaticProvider;
/// use minio::s3::http::BaseUrl;
/// use minio::s3::args::ListObjectsArgs;
/// use futures_util::StreamExt;
///
/// #[tokio::main]
/// async fn main() {
/// let base_url: BaseUrl = "play.min.io".parse().unwrap();
/// let static_provider = StaticProvider::new(
/// "Q3AM3UQ867SPQQA43P2F",
/// "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
/// None,
/// );
///
/// let client = ClientBuilder::new(base_url)
/// .provider(Some(Box::new(static_provider)))
/// .build()
/// .unwrap();
///
/// // List all objects in a directory.
/// let mut list_objects_arg = ListObjectsArgs::new("my-bucket").unwrap();
/// list_objects_arg.recursive = true;
/// let mut stream = client.list_objects(list_objects_arg).await;
/// while let Some(result) = stream.next().await {
/// match result {
/// Ok(items) => {
/// for item in items {
/// println!("{:?}", item);
/// }
/// }
/// Err(e) => println!("Error: {:?}", e),
/// }
/// }
/// }
pub async fn list_objects(
&self,
args: ListObjectsArgs,
) -> Box<dyn Stream<Item = Result<Vec<ListEntry>, Error>> + Unpin> {
if args.include_versions {
let stream = self.list_object_versions_stream(args.into()).await;
Box::new(stream.map(|v| v.map(|v| v.contents)))
} else if args.use_api_v1 {
let stream = self.list_objects_v1_stream(args.into()).await;
Box::new(stream.map(|v| v.map(|v| v.contents)))
} else {
let stream = self.list_objects_v2_stream(args.into()).await;
Box::new(stream.map(|v| v.map(|v| v.contents)))
}
}
}

View File

@ -17,7 +17,7 @@
use crate::s3::error::Error;
use crate::s3::types::{
parse_legal_hold, Bucket, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig,
parse_legal_hold, Bucket, LifecycleConfig, ListEntry, NotificationConfig, ObjectLockConfig,
ReplicationConfig, RetentionMode, SelectProgress, SseConfig,
};
use crate::s3::utils::{
@ -245,7 +245,7 @@ pub struct ListObjectsV1Response {
pub delimiter: Option<String>,
pub is_truncated: bool,
pub max_keys: Option<u16>,
pub contents: Vec<Item>,
pub contents: Vec<ListEntry>,
pub marker: Option<String>,
pub next_marker: Option<String>,
}
@ -260,7 +260,7 @@ pub struct ListObjectsV2Response {
pub delimiter: Option<String>,
pub is_truncated: bool,
pub max_keys: Option<u16>,
pub contents: Vec<Item>,
pub contents: Vec<ListEntry>,
pub key_count: Option<u16>,
pub start_after: Option<String>,
pub continuation_token: Option<String>,
@ -277,7 +277,7 @@ pub struct ListObjectVersionsResponse {
pub delimiter: Option<String>,
pub is_truncated: bool,
pub max_keys: Option<u16>,
pub contents: Vec<Item>,
pub contents: Vec<ListEntry>,
pub key_marker: Option<String>,
pub next_key_marker: Option<String>,
pub version_id_marker: Option<String>,
@ -294,7 +294,7 @@ pub struct ListObjectsResponse {
pub delimiter: Option<String>,
pub is_truncated: bool,
pub max_keys: Option<u16>,
pub contents: Vec<Item>,
pub contents: Vec<ListEntry>,
// ListObjectsV1
pub marker: String,

View File

@ -26,7 +26,7 @@ use xmltree::Element;
#[derive(Clone, Debug, Default)]
/// Contains information of an item of [list_objects()](crate::s3::client::Client::list_objects) API
pub struct Item {
pub struct ListEntry {
pub name: String,
pub last_modified: Option<UtcTime>,
pub etag: Option<String>, // except DeleteMarker

View File

@ -15,7 +15,6 @@
use async_std::task;
use chrono::Duration;
use futures_util::stream::StreamExt;
use hyper::http::Method;
use minio::s3::types::NotificationRecords;
use rand::distributions::{Alphanumeric, DistString};
@ -25,6 +24,7 @@ use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::{fs, io};
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
use minio::s3::args::*;
use minio::s3::client::Client;
@ -421,18 +421,17 @@ impl ClientTest {
names.push(object_name);
}
self.client
.list_objects(
&ListObjectsArgs::new(&self.test_bucket, &|items| {
let mut stream = self
.client
.list_objects(ListObjectsArgs::new(&self.test_bucket).unwrap())
.await;
while let Some(items) = stream.next().await {
let items = items.unwrap();
for item in items.iter() {
assert!(names.contains(&item.name));
}
true
})
.unwrap(),
)
.await
.unwrap();
}
let mut objects: Vec<DeleteObject> = Vec::new();
for name in names.iter() {