diff --git a/Cargo.toml b/Cargo.toml index 8aa5754..9bc559a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "minio-rs" version = "0.1.0" -authors = ["Aditya Manthramurthy "] +authors = ["Aditya Manthramurthy ", "Krishnan Parthasarathi "] edition = "2018" [dependencies] @@ -13,3 +13,5 @@ hyper-tls = "0.3.2" ring = "0.14.6" roxmltree = "0.6.0" time = "0.1.42" +xml-rs = "0.8.0" + diff --git a/src/main.rs b/src/main.rs index fffb8bf..ec2b3e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -34,6 +34,11 @@ fn main() { .map(move |e| println!("Bucket {} exists: {}", bucket, e)) .map_err(|err| println!("exists err: {:?}", err)); + let make_bucket_req = c + .make_bucket(bucket) + .map(move |_| println!("Bucket {} created", bucket)) + .map_err(move |err| println!("Bucket create for {} failed with {:?}", bucket, err)); + let download_req = c .get_object_req(bucket, "issue", vec![]) .and_then(|g| { @@ -44,7 +49,7 @@ fn main() { .map_err(|c| println!("err res: {:?}", c)); del_req - .join4(region_req, buc_exists_req, download_req) + .join5(make_bucket_req, region_req, buc_exists_req, download_req) .map(|_| ()) })); } diff --git a/src/minio.rs b/src/minio.rs index db9d0b8..3549a68 100644 --- a/src/minio.rs +++ b/src/minio.rs @@ -3,6 +3,8 @@ mod sign; mod types; mod xml; +mod woxml; + use bytes::Bytes; use futures::future::{self, Future}; use futures::stream::Stream; @@ -12,9 +14,10 @@ use hyper::{body::Body, client, header, header::HeaderMap, Method, Request, Resp use hyper_tls::HttpsConnector; use std::collections::HashMap; use std::env; -use std::{string, string::String}; +use std::string::String; use time; use time::Tm; + use types::{Err, GetObjectResp, Region}; #[derive(Debug, Clone)] @@ -132,6 +135,7 @@ impl Client { fn signed_req_future( &self, mut s3_req: S3Req, + body_res: Result, ) -> impl Future, Error = Err> { let hmap = &mut s3_req.headers; self.add_host_header(hmap); @@ -141,13 +145,18 @@ impl Client { HeaderValue::from_static("UNSIGNED-PAYLOAD"), ); hmap.insert(body_hash_hdr.0.clone(), body_hash_hdr.1.clone()); - - let sign_hdrs = sign::sign_v4(&s3_req, &self); - println!("signout: {:?}", sign_hdrs); - let req_result = api::mk_request(&s3_req, &self, &sign_hdrs); + let creds = self.credentials.clone(); + let region = self.region.clone(); + let server_addr = self.server.to_string(); let conn_client = self.conn_client.clone(); - future::result(req_result) - .map_err(|e| Err::HttpErr(e)) + + future::result(body_res) + .and_then(move |body| { + s3_req.body = body; + let sign_hdrs = sign::sign_v4(&s3_req, creds, region); + println!("signout: {:?}", sign_hdrs); + api::mk_request(&s3_req, &server_addr, &sign_hdrs) + }) .and_then(move |req| conn_client.make_req(req).map_err(|e| Err::HyperErr(e))) .and_then(|resp| { let st = resp.status(); @@ -187,14 +196,15 @@ impl Client { body: Body::empty(), ts: time::now_utc(), }; - self.signed_req_future(s3_req).and_then(|resp| { - // Read the whole body for bucket location response. - resp.into_body() - .concat2() - .map_err(|err| Err::HyperErr(err)) - .and_then(move |chunk| b2s(chunk.into_bytes())) - .and_then(|s| xml::parse_bucket_location(s)) - }) + self.signed_req_future(s3_req, Ok(Body::empty())) + .and_then(|resp| { + // Read the whole body for bucket location response. + resp.into_body() + .concat2() + .map_err(|err| Err::HyperErr(err)) + .and_then(move |chunk| b2s(chunk.into_bytes())) + .and_then(|s| xml::parse_bucket_location(s)) + }) } pub fn delete_bucket(&self, b: &str) -> impl Future { @@ -207,7 +217,8 @@ impl Client { body: Body::empty(), ts: time::now_utc(), }; - self.signed_req_future(s3_req).and_then(|_| Ok(())) + self.signed_req_future(s3_req, Ok(Body::empty())) + .and_then(|_| Ok(())) } pub fn bucket_exists(&self, b: &str) -> impl Future { @@ -220,18 +231,19 @@ impl Client { body: Body::empty(), ts: time::now_utc(), }; - self.signed_req_future(s3_req).then(|res| match res { - Ok(_) => Ok(true), - Err(Err::FailStatusCodeErr(st, b)) => { - let code = st.as_u16(); - if code == 404 { - Ok(false) - } else { - Err(Err::FailStatusCodeErr(st, b)) + self.signed_req_future(s3_req, Ok(Body::empty())) + .then(|res| match res { + Ok(_) => Ok(true), + Err(Err::FailStatusCodeErr(st, b)) => { + let code = st.as_u16(); + if code == 404 { + Ok(false) + } else { + Err(Err::FailStatusCodeErr(st, b)) + } } - } - Err(err) => Err(err), - }) + Err(err) => Err(err), + }) } pub fn get_object_req( @@ -258,8 +270,42 @@ impl Client { ts: time::now_utc(), }; - self.signed_req_future(s3_req).and_then(GetObjectResp::new) + self.signed_req_future(s3_req, Ok(Body::empty())) + .and_then(GetObjectResp::new) } + + pub fn make_bucket(&self, b: &str) -> impl Future { + let xml_body_res = xml::get_mk_bucket_body(); + let bucket = b.clone().to_string(); + let s3_req = S3Req { + method: Method::PUT, + bucket: Some(bucket), + object: None, + query: HashMap::new(), + headers: HeaderMap::new(), + body: Body::empty(), + ts: time::now_utc(), + }; + self.signed_req_future(s3_req, xml_body_res) + .and_then(|_| future::ok(())) + } +} + +fn run_req_future( + req_result: Result, Err>, + c: ConnClient, +) -> impl Future, Error = Err> { + future::result(req_result) + //.map_err(|e| Err::HttpErr(e)) + .and_then(move |req| c.make_req(req).map_err(|e| Err::HyperErr(e))) + .and_then(|resp| { + let st = resp.status(); + if st.is_success() { + Ok(resp) + } else { + Err(Err::RawSvcErr(st, resp)) + } + }) } fn b2s(b: Bytes) -> Result { diff --git a/src/minio/api.rs b/src/minio/api.rs index 51764ee..c23f51e 100644 --- a/src/minio/api.rs +++ b/src/minio/api.rs @@ -1,14 +1,12 @@ use crate::minio; -use http; use hyper::{header::HeaderName, header::HeaderValue, Body, Request}; pub fn mk_request( r: &minio::S3Req, - c: &minio::Client, + svr_str: &str, sign_hdrs: &Vec<(HeaderName, HeaderValue)>, -) -> http::Result> { +) -> Result, minio::Err> { let mut request = Request::builder(); - let svr_str = &c.server.to_string(); let uri_str = svr_str.trim_end_matches('/'); println!("uri_str: {}", uri_str); let upd_uri = format!("{}{}?{}", uri_str, r.mk_path(), r.mk_query()); @@ -23,5 +21,7 @@ pub fn mk_request( { request.header(hdr.0, hdr.1); } - request.body(Body::empty()) + request + .body(Body::empty()) + .map_err(|err| minio::Err::HttpErr(err)) } diff --git a/src/minio/sign.rs b/src/minio/sign.rs index 8777c9c..ea47cab 100644 --- a/src/minio/sign.rs +++ b/src/minio/sign.rs @@ -6,6 +6,7 @@ use std::collections::{HashMap, HashSet}; use time::Tm; use crate::minio; +use crate::minio::types::Region; fn aws_format_time(t: &Tm) -> String { t.strftime("%Y%m%dT%H%M%SZ").unwrap().to_string() @@ -142,9 +143,13 @@ fn compute_sign(str_to_sign: &str, key: &Vec) -> String { s1.as_ref().iter().map(|x| format!("{:02x}", x)).collect() } -pub fn sign_v4(r: &minio::S3Req, c: &minio::Client) -> Vec<(HeaderName, HeaderValue)> { - c.credentials.clone().map_or(Vec::new(), |creds| { - let scope = mk_scope(&r.ts, &c.region); +pub fn sign_v4( + r: &minio::S3Req, + creds: Option, + region: Region, +) -> Vec<(HeaderName, HeaderValue)> { + creds.map_or(Vec::new(), |creds| { + let scope = mk_scope(&r.ts, ®ion); let date_hdr = ( HeaderName::from_static("x-amz-date"), HeaderValue::from_str(&aws_format_time(&r.ts)).unwrap(), @@ -162,7 +167,7 @@ pub fn sign_v4(r: &minio::S3Req, c: &minio::Client) -> Vec<(HeaderName, HeaderVa println!("canonicalreq: {}", cr); let s2s = string_to_sign(&r.ts, &scope, &cr); println!("s2s: {}", s2s); - let skey = get_signing_key(&r.ts, &c.region.to_string(), &creds.secret_key); + let skey = get_signing_key(&r.ts, ®ion.to_string(), &creds.secret_key); println!("skey: {:?}", skey); let signature = compute_sign(&s2s, &skey); println!("sign: {}", signature); diff --git a/src/minio/types.rs b/src/minio/types.rs index 3010de0..a6dd06b 100644 --- a/src/minio/types.rs +++ b/src/minio/types.rs @@ -8,6 +8,7 @@ use hyper::{body::Body, Response}; use roxmltree; use std::string; +#[derive(Clone)] pub struct Region(String); impl Region { @@ -35,6 +36,7 @@ pub enum Err { XmlParseErr(roxmltree::Error), MissingRequiredParams, RawSvcErr(hyper::StatusCode, Response), + XmlWriteErr(String), } pub struct GetObjectResp { diff --git a/src/minio/woxml.rs b/src/minio/woxml.rs new file mode 100644 index 0000000..0faeb33 --- /dev/null +++ b/src/minio/woxml.rs @@ -0,0 +1,74 @@ +extern crate xml; + +use xml::writer::{EmitterConfig, EventWriter, XmlEvent}; + +pub struct XmlNode { + name: String, + namespace: Option, + text: Option, + children: Vec, +} + +impl XmlNode { + pub fn new(name: &str) -> XmlNode { + XmlNode { + name: name.to_string(), + namespace: None, + text: None, + children: Vec::new(), + } + } + pub fn namespace(mut self, ns: &str) -> XmlNode { + self.namespace = Some(ns.to_string()); + self + } + + pub fn text(mut self, value: &str) -> XmlNode { + self.text = Some(value.to_string()); + self + } + + pub fn children(mut self, kids: Vec) -> XmlNode { + self.children = kids; + self + } + + fn serialize_rec(&self, xml_writer: &mut EventWriter) -> xml::writer::Result<()> + where + W: std::io::Write, + { + let st_elem = XmlEvent::start_element(self.name.as_str()); + let st_elem = match &self.namespace { + Some(ns) => st_elem.ns("", ns.clone()), + None => st_elem, + }; + xml_writer.write(st_elem)?; + + // An xml node would have a text field or child nodes, not both, at least not usually. + match &self.text { + Some(content) => { + let content_node = XmlEvent::characters(content.as_str()); + xml_writer.write(content_node)?; + } + None => { + for child in &self.children { + child.serialize_rec(xml_writer)?; + } + } + } + + let end_elem: XmlEvent = XmlEvent::end_element().name(self.name.as_str()).into(); + xml_writer.write(end_elem)?; + + Ok(()) + } + pub fn serialize(&self, writer: W) -> xml::writer::Result<()> + where + W: std::io::Write, + { + let mut xml_writer = EmitterConfig::new() + .perform_indent(true) + .create_writer(writer); + self.serialize_rec(&mut xml_writer) + } +} diff --git a/src/minio/xml.rs b/src/minio/xml.rs index 9f48664..3852504 100644 --- a/src/minio/xml.rs +++ b/src/minio/xml.rs @@ -1,4 +1,6 @@ use crate::minio::types::{Err, Region}; +use crate::minio::woxml; +use hyper::body::Body; use roxmltree; pub fn parse_bucket_location(s: String) -> Result { @@ -15,3 +17,16 @@ pub fn parse_bucket_location(s: String) -> Result { Err(e) => Err(Err::XmlParseErr(e)), } } + +pub fn get_mk_bucket_body() -> Result { + let lc_node = woxml::XmlNode::new("LocationConstraint").text("us-east-1"); + let mk_bucket_xml = woxml::XmlNode::new("CreateBucketConfiguration") + .namespace("http://s3.amazonaws.com/doc/2006-03-01/") + .children(vec![lc_node]); + let mut xml_bytes = Vec::new(); + + mk_bucket_xml + .serialize(&mut xml_bytes) + .or_else(|err| Err(Err::XmlWriteErr(err.to_string())))?; + Ok(Body::from(xml_bytes)) +}