Add list_objects API (#2)

This commit is contained in:
Krishnan Parthasarathi 2019-06-24 11:59:37 -07:00 committed by Aditya Manthramurthy
parent 36a3b3c155
commit 124b254009
4 changed files with 209 additions and 14 deletions

View File

@ -62,9 +62,15 @@ fn main() {
})
.map_err(|err| println!("{:?}", err));
let list_objects_req = c
.list_objects(bucket, None, None, None, None)
.map(|l_obj_resp| println!("{:?} {:?}", l_obj_resp, l_obj_resp.object_infos.len()))
.map_err(|err| println!("{:?}", err));
del_req
.join5(make_bucket_req, region_req, buc_exists_req, download_req)
.map(|_| ())
.then(|_| list_buckets_req)
.and_then(|_| list_buckets_req)
.then(|_| list_objects_req)
}));
}

View File

@ -18,7 +18,7 @@ use std::string::String;
use time;
use time::Tm;
use types::{Err, GetObjectResp, Region};
use types::{Err, GetObjectResp, ListObjectsResp, Region};
pub use types::BucketInfo;
@ -312,6 +312,50 @@ impl Client {
.and_then(|s| xml::parse_bucket_list(s))
})
}
pub fn list_objects(
&self,
b: &str,
prefix: Option<&str>,
marker: Option<&str>,
delimiter: Option<&str>,
max_keys: Option<i32>,
) -> impl Future<Item = ListObjectsResp, Error = Err> {
let mut qparams = HashMap::new();
qparams.insert("list-type".to_string(), Some("2".to_string()));
if let Some(d) = delimiter {
qparams.insert("delimiter".to_string(), Some(d.to_string()));
}
if let Some(m) = marker {
qparams.insert("marker".to_string(), Some(m.to_string()));
}
if let Some(p) = prefix {
qparams.insert("prefix".to_string(), Some(p.to_string()));
}
if let Some(mkeys) = max_keys {
qparams.insert("max-keys".to_string(), Some(mkeys.to_string()));
}
let s3_req = S3Req {
method: Method::GET,
bucket: Some(b.to_string()),
object: None,
query: qparams,
headers: HeaderMap::new(),
body: Body::empty(),
ts: time::now_utc(),
};
self.signed_req_future(s3_req, Ok(Body::empty()))
.and_then(|resp| {
resp.into_body()
.concat2()
.map_err(|err| Err::HyperErr(err))
.and_then(move |chunk| b2s(chunk.into_bytes()))
.and_then(|s| xml::parse_list_objects(s))
})
}
}
fn run_req_future(

View File

@ -6,6 +6,7 @@ use hyper::header::{
};
use hyper::{body::Body, Response};
use roxmltree;
use std::collections::HashMap;
use std::string;
use time::{strptime, Tm};
@ -35,7 +36,9 @@ pub enum Err {
HyperErr(hyper::Error),
FailStatusCodeErr(hyper::StatusCode, Bytes),
Utf8DecodingErr(string::FromUtf8Error),
XmlParseErr(roxmltree::Error),
XmlDocParseErr(roxmltree::Error),
XmlElemMissing(String),
XmlElemParseErr(String),
InvalidXmlResponseErr(String),
MissingRequiredParams,
RawSvcErr(hyper::StatusCode, Response<Body>),
@ -101,6 +104,11 @@ fn extract_user_meta(h: &HeaderMap) -> Vec<(String, String)> {
.collect()
}
fn parse_aws_time(time_str: &str) -> Result<Tm, Err> {
strptime(time_str, "%Y-%m-%dT%H:%M:%S.%Z")
.map_err(|err| Err::InvalidTmFmt(format!("{:?}", err)))
}
#[derive(Debug)]
pub struct BucketInfo {
pub name: String,
@ -108,14 +116,56 @@ pub struct BucketInfo {
}
impl BucketInfo {
pub fn new(name: &str, time_str: &str) -> Result<BucketInfo, String> {
strptime(time_str, "%Y-%m-%dT%H:%M:%S.%Z")
.and_then(|ctime| {
Ok(BucketInfo {
name: name.to_string(),
created_time: ctime,
})
pub fn new(name: &str, time_str: &str) -> Result<BucketInfo, Err> {
parse_aws_time(time_str).and_then(|ctime| {
Ok(BucketInfo {
name: name.to_string(),
created_time: ctime,
})
.or_else(|err| Err(format!("{:?}", err)))
})
}
}
#[derive(Debug)]
pub struct ObjectInfo {
pub name: String,
pub modified_time: Tm,
pub etag: String,
pub size: i64,
pub storage_class: String,
pub metadata: HashMap<String, String>,
}
impl ObjectInfo {
pub fn new(
name: &str,
mtime_str: &str,
etag: &str,
size: i64,
storage_class: &str,
metadata: HashMap<String, String>,
) -> Result<ObjectInfo, Err> {
parse_aws_time(mtime_str).and_then(|mtime| {
Ok(ObjectInfo {
name: name.to_string(),
modified_time: mtime,
etag: etag.to_string(),
size: size,
storage_class: storage_class.to_string(),
metadata: metadata,
})
})
}
}
#[derive(Debug)]
pub struct ListObjectsResp {
pub bucket_name: String,
pub prefix: String,
pub max_keys: i32,
pub key_count: i32,
pub is_truncated: bool,
pub object_infos: Vec<ObjectInfo>,
pub common_prefixes: Vec<String>,
pub next_continuation_token: String,
}

View File

@ -1,7 +1,9 @@
use crate::minio::types::{BucketInfo, Err, Region};
use crate::minio::types::{BucketInfo, Err, ListObjectsResp, ObjectInfo, Region};
use crate::minio::woxml;
use hyper::body::Body;
use roxmltree;
use std::collections::HashMap;
use std::str::FromStr;
pub fn parse_bucket_location(s: String) -> Result<Region, Err> {
let res = roxmltree::Document::parse(&s);
@ -14,7 +16,7 @@ pub fn parse_bucket_location(s: String) -> Result<Region, Err> {
Ok(Region::empty())
}
}
Err(e) => Err(Err::XmlParseErr(e)),
Err(e) => Err(Err::XmlDocParseErr(e)),
}
}
@ -47,7 +49,15 @@ pub fn parse_bucket_list(s: String) -> Result<Vec<BucketInfo>, Err> {
}
Ok(bucket_infos)
}
Err(err) => Err(Err::XmlParseErr(err)),
Err(err) => Err(Err::XmlDocParseErr(err)),
}
}
pub fn parse_list_objects(s: String) -> Result<ListObjectsResp, Err> {
let doc_res = roxmltree::Document::parse(&s);
match doc_res {
Ok(doc) => parse_list_objects_result(doc),
Err(err) => panic!(err),
}
}
@ -63,3 +73,88 @@ pub fn get_mk_bucket_body() -> Result<Body, Err> {
.or_else(|err| Err(Err::XmlWriteErr(err.to_string())))?;
Ok(Body::from(xml_bytes))
}
fn get_child_node(node: roxmltree::Node, tag_name: &str) -> Option<String> {
node.children()
.find(|node| node.has_tag_name(tag_name))
.and_then(|node| node.text())
.map(|x| x.to_string())
}
// gets text value inside given tag or return default
fn get_child_node_or(node: roxmltree::Node, tag_name: &str, default: &str) -> String {
get_child_node(node, tag_name).unwrap_or(default.to_string())
}
fn parse_child_content<T>(node: roxmltree::Node, tag: &str) -> Result<T, Err>
where
T: FromStr,
{
let content = get_child_node(node, tag).ok_or(Err::XmlElemMissing(format!("{:?}", tag)))?;
str::parse::<T>(content.as_str()).map_err(|_| Err::XmlElemParseErr(format!("{}", tag)))
}
fn parse_tag_content<T>(node: roxmltree::Node) -> Result<T, Err>
where
T: FromStr,
{
let content = node
.text()
.ok_or(Err::XmlElemMissing(format!("{:?}", node.tag_name())))?;
str::parse::<T>(content).map_err(|_| Err::XmlElemParseErr(format!("{:?}", node.tag_name())))
}
fn parse_object_infos(node: roxmltree::Node) -> Result<Vec<ObjectInfo>, Err> {
let mut object_infos: Vec<ObjectInfo> = Vec::new();
let contents_nodes = node
.descendants()
.filter(|node| node.has_tag_name("Contents"));
for node in contents_nodes {
let keys = node.children().filter(|node| node.has_tag_name("Key"));
let mtimes = node
.children()
.filter(|node| node.has_tag_name("LastModified"));
let etags = node.children().filter(|node| node.has_tag_name("ETag"));
let sizes = node.children().filter(|node| node.has_tag_name("Size"));
let storage_classes = node
.children()
.filter(|node| node.has_tag_name("StorageClass"));
for (key, (mtime, (etag, (size, storage_class)))) in
keys.zip(mtimes.zip(etags.zip(sizes.zip(storage_classes))))
{
let sz: i64 = parse_tag_content(size)?;
let object_info = ObjectInfo::new(
key.text().unwrap(),
mtime.text().unwrap(),
etag.text().unwrap(),
sz,
storage_class.text().unwrap(),
HashMap::new(),
)?;
object_infos.push(object_info);
}
}
Ok(object_infos)
}
fn parse_list_objects_result(doc: roxmltree::Document) -> Result<ListObjectsResp, Err> {
let root = doc.root_element();
let bucket_name =
get_child_node(root, "Name").ok_or(Err::XmlElemMissing("Name".to_string()))?;
let prefix = get_child_node_or(root, "Prefix", "");
let key_count: i32 = parse_child_content(root, "KeyCount")?;
let max_keys: i32 = parse_child_content(root, "MaxKeys")?;
let is_truncated: bool = parse_child_content(root, "IsTruncated")?;
let object_infos = parse_object_infos(root)?;
Ok(ListObjectsResp {
bucket_name: bucket_name,
prefix: prefix,
max_keys: max_keys,
key_count: key_count,
is_truncated: is_truncated,
next_continuation_token: "".to_string(),
common_prefixes: Vec::new(),
object_infos: object_infos,
})
}