Add make_bucket api

This commit is contained in:
Krishnan Parthasarathi 2019-06-03 17:02:34 -07:00 committed by Aditya Manthramurthy
parent 8647dccfde
commit fe8d248fed
8 changed files with 188 additions and 39 deletions

View File

@ -1,7 +1,7 @@
[package]
name = "minio-rs"
version = "0.1.0"
authors = ["Aditya Manthramurthy <aditya.mmy@gmail.com>"]
authors = ["Aditya Manthramurthy <aditya.mmy@gmail.com>", "Krishnan Parthasarathi <krishnan.parthasarathi@gmail.com>"]
edition = "2018"
[dependencies]
@ -13,3 +13,5 @@ hyper-tls = "0.3.2"
ring = "0.14.6"
roxmltree = "0.6.0"
time = "0.1.42"
xml-rs = "0.8.0"

View File

@ -34,6 +34,11 @@ fn main() {
.map(move |e| println!("Bucket {} exists: {}", bucket, e))
.map_err(|err| println!("exists err: {:?}", err));
let make_bucket_req = c
.make_bucket(bucket)
.map(move |_| println!("Bucket {} created", bucket))
.map_err(move |err| println!("Bucket create for {} failed with {:?}", bucket, err));
let download_req = c
.get_object_req(bucket, "issue", vec![])
.and_then(|g| {
@ -44,7 +49,7 @@ fn main() {
.map_err(|c| println!("err res: {:?}", c));
del_req
.join4(region_req, buc_exists_req, download_req)
.join5(make_bucket_req, region_req, buc_exists_req, download_req)
.map(|_| ())
}));
}

View File

@ -3,6 +3,8 @@ mod sign;
mod types;
mod xml;
mod woxml;
use bytes::Bytes;
use futures::future::{self, Future};
use futures::stream::Stream;
@ -12,9 +14,10 @@ use hyper::{body::Body, client, header, header::HeaderMap, Method, Request, Resp
use hyper_tls::HttpsConnector;
use std::collections::HashMap;
use std::env;
use std::{string, string::String};
use std::string::String;
use time;
use time::Tm;
use types::{Err, GetObjectResp, Region};
#[derive(Debug, Clone)]
@ -132,6 +135,7 @@ impl Client {
fn signed_req_future(
&self,
mut s3_req: S3Req,
body_res: Result<Body, Err>,
) -> impl Future<Item = Response<Body>, Error = Err> {
let hmap = &mut s3_req.headers;
self.add_host_header(hmap);
@ -141,13 +145,18 @@ impl Client {
HeaderValue::from_static("UNSIGNED-PAYLOAD"),
);
hmap.insert(body_hash_hdr.0.clone(), body_hash_hdr.1.clone());
let sign_hdrs = sign::sign_v4(&s3_req, &self);
println!("signout: {:?}", sign_hdrs);
let req_result = api::mk_request(&s3_req, &self, &sign_hdrs);
let creds = self.credentials.clone();
let region = self.region.clone();
let server_addr = self.server.to_string();
let conn_client = self.conn_client.clone();
future::result(req_result)
.map_err(|e| Err::HttpErr(e))
future::result(body_res)
.and_then(move |body| {
s3_req.body = body;
let sign_hdrs = sign::sign_v4(&s3_req, creds, region);
println!("signout: {:?}", sign_hdrs);
api::mk_request(&s3_req, &server_addr, &sign_hdrs)
})
.and_then(move |req| conn_client.make_req(req).map_err(|e| Err::HyperErr(e)))
.and_then(|resp| {
let st = resp.status();
@ -187,14 +196,15 @@ impl Client {
body: Body::empty(),
ts: time::now_utc(),
};
self.signed_req_future(s3_req).and_then(|resp| {
// Read the whole body for bucket location response.
resp.into_body()
.concat2()
.map_err(|err| Err::HyperErr(err))
.and_then(move |chunk| b2s(chunk.into_bytes()))
.and_then(|s| xml::parse_bucket_location(s))
})
self.signed_req_future(s3_req, Ok(Body::empty()))
.and_then(|resp| {
// Read the whole body for bucket location response.
resp.into_body()
.concat2()
.map_err(|err| Err::HyperErr(err))
.and_then(move |chunk| b2s(chunk.into_bytes()))
.and_then(|s| xml::parse_bucket_location(s))
})
}
pub fn delete_bucket(&self, b: &str) -> impl Future<Item = (), Error = Err> {
@ -207,7 +217,8 @@ impl Client {
body: Body::empty(),
ts: time::now_utc(),
};
self.signed_req_future(s3_req).and_then(|_| Ok(()))
self.signed_req_future(s3_req, Ok(Body::empty()))
.and_then(|_| Ok(()))
}
pub fn bucket_exists(&self, b: &str) -> impl Future<Item = bool, Error = Err> {
@ -220,18 +231,19 @@ impl Client {
body: Body::empty(),
ts: time::now_utc(),
};
self.signed_req_future(s3_req).then(|res| match res {
Ok(_) => Ok(true),
Err(Err::FailStatusCodeErr(st, b)) => {
let code = st.as_u16();
if code == 404 {
Ok(false)
} else {
Err(Err::FailStatusCodeErr(st, b))
self.signed_req_future(s3_req, Ok(Body::empty()))
.then(|res| match res {
Ok(_) => Ok(true),
Err(Err::FailStatusCodeErr(st, b)) => {
let code = st.as_u16();
if code == 404 {
Ok(false)
} else {
Err(Err::FailStatusCodeErr(st, b))
}
}
}
Err(err) => Err(err),
})
Err(err) => Err(err),
})
}
pub fn get_object_req(
@ -258,8 +270,42 @@ impl Client {
ts: time::now_utc(),
};
self.signed_req_future(s3_req).and_then(GetObjectResp::new)
self.signed_req_future(s3_req, Ok(Body::empty()))
.and_then(GetObjectResp::new)
}
pub fn make_bucket(&self, b: &str) -> impl Future<Item = (), Error = Err> {
let xml_body_res = xml::get_mk_bucket_body();
let bucket = b.clone().to_string();
let s3_req = S3Req {
method: Method::PUT,
bucket: Some(bucket),
object: None,
query: HashMap::new(),
headers: HeaderMap::new(),
body: Body::empty(),
ts: time::now_utc(),
};
self.signed_req_future(s3_req, xml_body_res)
.and_then(|_| future::ok(()))
}
}
fn run_req_future(
req_result: Result<Request<Body>, Err>,
c: ConnClient,
) -> impl Future<Item = Response<Body>, Error = Err> {
future::result(req_result)
//.map_err(|e| Err::HttpErr(e))
.and_then(move |req| c.make_req(req).map_err(|e| Err::HyperErr(e)))
.and_then(|resp| {
let st = resp.status();
if st.is_success() {
Ok(resp)
} else {
Err(Err::RawSvcErr(st, resp))
}
})
}
fn b2s(b: Bytes) -> Result<String, Err> {

View File

@ -1,14 +1,12 @@
use crate::minio;
use http;
use hyper::{header::HeaderName, header::HeaderValue, Body, Request};
pub fn mk_request(
r: &minio::S3Req,
c: &minio::Client,
svr_str: &str,
sign_hdrs: &Vec<(HeaderName, HeaderValue)>,
) -> http::Result<Request<Body>> {
) -> Result<Request<Body>, minio::Err> {
let mut request = Request::builder();
let svr_str = &c.server.to_string();
let uri_str = svr_str.trim_end_matches('/');
println!("uri_str: {}", uri_str);
let upd_uri = format!("{}{}?{}", uri_str, r.mk_path(), r.mk_query());
@ -23,5 +21,7 @@ pub fn mk_request(
{
request.header(hdr.0, hdr.1);
}
request.body(Body::empty())
request
.body(Body::empty())
.map_err(|err| minio::Err::HttpErr(err))
}

View File

@ -6,6 +6,7 @@ use std::collections::{HashMap, HashSet};
use time::Tm;
use crate::minio;
use crate::minio::types::Region;
fn aws_format_time(t: &Tm) -> String {
t.strftime("%Y%m%dT%H%M%SZ").unwrap().to_string()
@ -142,9 +143,13 @@ fn compute_sign(str_to_sign: &str, key: &Vec<u8>) -> String {
s1.as_ref().iter().map(|x| format!("{:02x}", x)).collect()
}
pub fn sign_v4(r: &minio::S3Req, c: &minio::Client) -> Vec<(HeaderName, HeaderValue)> {
c.credentials.clone().map_or(Vec::new(), |creds| {
let scope = mk_scope(&r.ts, &c.region);
pub fn sign_v4(
r: &minio::S3Req,
creds: Option<minio::Credentials>,
region: Region,
) -> Vec<(HeaderName, HeaderValue)> {
creds.map_or(Vec::new(), |creds| {
let scope = mk_scope(&r.ts, &region);
let date_hdr = (
HeaderName::from_static("x-amz-date"),
HeaderValue::from_str(&aws_format_time(&r.ts)).unwrap(),
@ -162,7 +167,7 @@ pub fn sign_v4(r: &minio::S3Req, c: &minio::Client) -> Vec<(HeaderName, HeaderVa
println!("canonicalreq: {}", cr);
let s2s = string_to_sign(&r.ts, &scope, &cr);
println!("s2s: {}", s2s);
let skey = get_signing_key(&r.ts, &c.region.to_string(), &creds.secret_key);
let skey = get_signing_key(&r.ts, &region.to_string(), &creds.secret_key);
println!("skey: {:?}", skey);
let signature = compute_sign(&s2s, &skey);
println!("sign: {}", signature);

View File

@ -8,6 +8,7 @@ use hyper::{body::Body, Response};
use roxmltree;
use std::string;
#[derive(Clone)]
pub struct Region(String);
impl Region {
@ -35,6 +36,7 @@ pub enum Err {
XmlParseErr(roxmltree::Error),
MissingRequiredParams,
RawSvcErr(hyper::StatusCode, Response<Body>),
XmlWriteErr(String),
}
pub struct GetObjectResp {

74
src/minio/woxml.rs Normal file
View File

@ -0,0 +1,74 @@
extern crate xml;
use xml::writer::{EmitterConfig, EventWriter, XmlEvent};
pub struct XmlNode {
name: String,
namespace: Option<String>,
text: Option<String>,
children: Vec<XmlNode>,
}
impl XmlNode {
pub fn new(name: &str) -> XmlNode {
XmlNode {
name: name.to_string(),
namespace: None,
text: None,
children: Vec::new(),
}
}
pub fn namespace(mut self, ns: &str) -> XmlNode {
self.namespace = Some(ns.to_string());
self
}
pub fn text(mut self, value: &str) -> XmlNode {
self.text = Some(value.to_string());
self
}
pub fn children(mut self, kids: Vec<XmlNode>) -> XmlNode {
self.children = kids;
self
}
fn serialize_rec<W>(&self, xml_writer: &mut EventWriter<W>) -> xml::writer::Result<()>
where
W: std::io::Write,
{
let st_elem = XmlEvent::start_element(self.name.as_str());
let st_elem = match &self.namespace {
Some(ns) => st_elem.ns("", ns.clone()),
None => st_elem,
};
xml_writer.write(st_elem)?;
// An xml node would have a text field or child nodes, not both, at least not usually.
match &self.text {
Some(content) => {
let content_node = XmlEvent::characters(content.as_str());
xml_writer.write(content_node)?;
}
None => {
for child in &self.children {
child.serialize_rec(xml_writer)?;
}
}
}
let end_elem: XmlEvent = XmlEvent::end_element().name(self.name.as_str()).into();
xml_writer.write(end_elem)?;
Ok(())
}
pub fn serialize<W>(&self, writer: W) -> xml::writer::Result<()>
where
W: std::io::Write,
{
let mut xml_writer = EmitterConfig::new()
.perform_indent(true)
.create_writer(writer);
self.serialize_rec(&mut xml_writer)
}
}

View File

@ -1,4 +1,6 @@
use crate::minio::types::{Err, Region};
use crate::minio::woxml;
use hyper::body::Body;
use roxmltree;
pub fn parse_bucket_location(s: String) -> Result<Region, Err> {
@ -15,3 +17,16 @@ pub fn parse_bucket_location(s: String) -> Result<Region, Err> {
Err(e) => Err(Err::XmlParseErr(e)),
}
}
pub fn get_mk_bucket_body() -> Result<Body, Err> {
let lc_node = woxml::XmlNode::new("LocationConstraint").text("us-east-1");
let mk_bucket_xml = woxml::XmlNode::new("CreateBucketConfiguration")
.namespace("http://s3.amazonaws.com/doc/2006-03-01/")
.children(vec![lc_node]);
let mut xml_bytes = Vec::new();
mk_bucket_xml
.serialize(&mut xml_bytes)
.or_else(|err| Err(Err::XmlWriteErr(err.to_string())))?;
Ok(Body::from(xml_bytes))
}