Get object request

This commit is contained in:
Aditya Manthramurthy 2019-05-31 14:54:16 -07:00
parent a40a6e15ed
commit 8647dccfde
3 changed files with 106 additions and 3 deletions

View File

@ -1,6 +1,6 @@
mod minio;
use futures::future::Future;
use futures::{future::Future, stream::Stream};
use hyper::rt;
fn get_local_default_server() -> minio::Client {
@ -34,6 +34,17 @@ fn main() {
.map(move |e| println!("Bucket {} exists: {}", bucket, e))
.map_err(|err| println!("exists err: {:?}", err));
del_req.join3(region_req, buc_exists_req).map(|_| ())
let download_req = c
.get_object_req(bucket, "issue", vec![])
.and_then(|g| {
println!("issue: {} {} {:?}", g.object_size, g.etag, g.content_type);
g.get_object_stream().concat2()
})
.map(|c| println!("get obj res: {:?}", c))
.map_err(|c| println!("err res: {:?}", c));
del_req
.join4(region_req, buc_exists_req, download_req)
.map(|_| ())
}));
}

View File

@ -15,7 +15,7 @@ use std::env;
use std::{string, string::String};
use time;
use time::Tm;
use types::{Err, Region};
use types::{Err, GetObjectResp, Region};
#[derive(Debug, Clone)]
pub struct Credentials {
@ -233,6 +233,33 @@ impl Client {
Err(err) => Err(err),
})
}
pub fn get_object_req(
&self,
b: &str,
key: &str,
get_obj_opts: Vec<(HeaderName, HeaderValue)>,
) -> impl Future<Item = GetObjectResp, Error = Err> {
let mut h = HeaderMap::new();
get_obj_opts
.iter()
.map(|(x, y)| (x.clone(), y.clone()))
.for_each(|(k, v)| {
h.insert(k, v);
});
let s3_req = S3Req {
method: Method::GET,
bucket: Some(b.to_string()),
object: Some(key.to_string()),
headers: h,
query: HashMap::new(),
body: Body::empty(),
ts: time::now_utc(),
};
self.signed_req_future(s3_req).and_then(GetObjectResp::new)
}
}
fn b2s(b: Bytes) -> Result<String, Err> {

View File

@ -1,4 +1,9 @@
use bytes::Bytes;
use futures::stream::Stream;
use hyper::header::{
HeaderMap, HeaderValue, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE,
CONTENT_LENGTH, CONTENT_TYPE, ETAG, EXPIRES,
};
use hyper::{body::Body, Response};
use roxmltree;
use std::string;
@ -28,5 +33,65 @@ pub enum Err {
FailStatusCodeErr(hyper::StatusCode, Bytes),
Utf8DecodingErr(string::FromUtf8Error),
XmlParseErr(roxmltree::Error),
MissingRequiredParams,
RawSvcErr(hyper::StatusCode, Response<Body>),
}
pub struct GetObjectResp {
pub user_metadata: Vec<(String, String)>,
pub object_size: u64,
pub etag: String,
// standard headers
pub content_type: Option<String>,
pub content_language: Option<String>,
pub expires: Option<String>,
pub cache_control: Option<String>,
pub content_disposition: Option<String>,
pub content_encoding: Option<String>,
resp: Response<Body>,
}
impl GetObjectResp {
pub fn new(r: Response<Body>) -> Result<GetObjectResp, Err> {
let h = r.headers();
let cl_opt = hv2s(h.get(CONTENT_LENGTH)).and_then(|l| l.parse::<u64>().ok());
let etag_opt = hv2s(h.get(ETAG));
match (cl_opt, etag_opt) {
(Some(cl), Some(etag)) => Ok(GetObjectResp {
user_metadata: extract_user_meta(h),
object_size: cl,
etag: etag,
content_type: hv2s(h.get(CONTENT_TYPE)),
content_language: hv2s(h.get(CONTENT_LANGUAGE)),
expires: hv2s(h.get(EXPIRES)),
cache_control: hv2s(h.get(CACHE_CONTROL)),
content_disposition: hv2s(h.get(CONTENT_DISPOSITION)),
content_encoding: hv2s(h.get(CONTENT_ENCODING)),
resp: r,
}),
_ => Err(Err::MissingRequiredParams),
}
}
// Consumes GetObjectResp
pub fn get_object_stream(self) -> impl Stream<Item = hyper::Chunk, Error = Err> {
self.resp.into_body().map_err(|err| Err::HyperErr(err))
}
}
fn hv2s(o: Option<&HeaderValue>) -> Option<String> {
o.and_then(|v| v.to_str().ok()).map(|x| x.to_string())
}
fn extract_user_meta(h: &HeaderMap) -> Vec<(String, String)> {
h.iter()
.map(|(k, v)| (k.as_str(), v.to_str()))
.filter(|(k, v)| k.to_lowercase().starts_with("x-amz-meta-") && v.is_ok())
.map(|(k, v)| (k.to_string(), v.unwrap_or("").to_string()))
.collect()
}