From f310d96d37b012392a48665b51e4f637c52b8654 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Thu, 30 May 2019 13:58:17 -0700 Subject: [PATCH] Refactor to use enum for conn client --- src/main.rs | 4 +- src/minio.rs | 106 ++++++++++++++++++++++++++------------------------- 2 files changed, 57 insertions(+), 53 deletions(-) diff --git a/src/main.rs b/src/main.rs index 4034743..4438099 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,8 +15,8 @@ fn get_local_default_server() -> minio::Client { fn main() { rt::run(rt::lazy(|| { - // let c = get_local_default_server(); - let c = minio::Client::get_play_client(); + let c = get_local_default_server(); + // let c = minio::Client::get_play_client(); c.get_bucket_location("txp") .map(|res| println!("{}", res)) .map_err(|err| println!("{:?}", err)) diff --git a/src/minio.rs b/src/minio.rs index 8894eaf..1f0dc29 100644 --- a/src/minio.rs +++ b/src/minio.rs @@ -6,7 +6,7 @@ use futures::future::{self, Future}; use futures::stream::Stream; use http; use hyper::header::{HeaderName, HeaderValue}; -use hyper::{body::Body, client, header, header::HeaderMap, Method, Uri}; +use hyper::{body::Body, client, header, header::HeaderMap, Method, Request, Response, Uri}; use hyper_tls::HttpsConnector; use std::collections::HashMap; use std::env; @@ -50,36 +50,28 @@ pub enum Err { HyperErr(hyper::Error), FailStatusCodeErr(hyper::StatusCode, Bytes), Utf8DecodingErr(string::FromUtf8Error), + RawSvcErr(hyper::StatusCode, Response), } -trait ApiClient { - fn make_req(&self, req: http::Request) -> client::ResponseFuture; +#[derive(Clone)] +enum ConnClient { + HttpCC(client::Client), + HttpsCC(client::Client, Body>), } -struct HttpApiClient { - c: client::Client, -} - -impl ApiClient for HttpApiClient { +impl ConnClient { fn make_req(&self, req: http::Request) -> client::ResponseFuture { - self.c.request(req) - } -} - -struct HttpsApiClient { - c: client::Client, Body>, -} - -impl ApiClient for HttpsApiClient { - fn make_req(&self, req: http::Request) -> client::ResponseFuture { - self.c.request(req) + match self { + ConnClient::HttpCC(c) => c.request(req), + ConnClient::HttpsCC(c) => c.request(req), + } } } pub struct Client { server: Uri, region: Region, - conn_client: Arc>, + conn_client: ConnClient, pub credentials: Option, } @@ -97,14 +89,12 @@ impl Client { server: s.clone(), region: String::from(""), conn_client: if s.scheme_str() == Some("http") { - Arc::new(Box::new(HttpApiClient { - c: client::Client::new(), - })) + ConnClient::HttpCC(client::Client::new()) } else { let https = HttpsConnector::new(4).unwrap(); - Arc::new(Box::new(HttpsApiClient { - c: client::Client::builder().build::<_, hyper::Body>(https), - })) + ConnClient::HttpsCC( + client::Client::builder().build::<_, hyper::Body>(https), + ) }, credentials: None, }) @@ -141,9 +131,7 @@ impl Client { region: String::from(""), conn_client: { let https = HttpsConnector::new(4).unwrap(); - Arc::new(Box::new(HttpsApiClient { - c: client::Client::builder().build::<_, hyper::Body>(https), - })) + ConnClient::HttpsCC(client::Client::builder().build::<_, hyper::Body>(https)) }, credentials: Some(Credentials::new( "Q3AM3UQ867SPQQA43P2F", @@ -176,31 +164,47 @@ impl Client { let sign_hdrs = sign::sign_v4(&s3_req, &self); println!("signout: {:?}", sign_hdrs); let req_result = api::mk_request(&s3_req, &self, &sign_hdrs); - let conn_client = Arc::clone(&self.conn_client); - future::result(req_result) - .map_err(|e| Err::HttpErr(e)) - .and_then(move |req| { - conn_client - .make_req(req) - .map_err(|e| Err::HyperErr(e)) - .and_then(|resp| { - let st = resp.status(); - resp.into_body() - .concat2() - .map_err(|err| Err::HyperErr(err)) - .map(move |chunk| (st, chunk.into_bytes())) - }) - .and_then(|(code, body)| { - if code.is_success() { - b2s(body) - } else { - Err(Err::FailStatusCodeErr(code, body)) - } - }) - }) + let conn_client = self.conn_client.clone(); + run_req_future(req_result, conn_client).and_then(|resp| { + // Read the whole body for bucket location response. + resp.into_body() + .concat2() + .map_err(|err| Err::HyperErr(err)) + .and_then(move |chunk| b2s(chunk.into_bytes())) + }) } } +fn run_req_future( + req_result: http::Result>, + c: ConnClient, +) -> impl Future, Error = Err> { + future::result(req_result) + .map_err(|e| Err::HttpErr(e)) + .and_then(move |req| c.make_req(req).map_err(|e| Err::HyperErr(e))) + .and_then(|resp| { + let st = resp.status(); + if st.is_success() { + Ok(resp) + } else { + Err(Err::RawSvcErr(st, resp)) + } + }) + .or_else(|err| { + future::err(err) + .or_else(|x| match x { + Err::RawSvcErr(st, resp) => Ok((st, resp)), + other_err => Err(other_err), + }) + .and_then(|(st, resp)| { + resp.into_body() + .concat2() + .map_err(|err| Err::HyperErr(err)) + .and_then(move |chunk| Err(Err::FailStatusCodeErr(st, chunk.into_bytes()))) + }) + }) +} + fn b2s(b: Bytes) -> Result { match String::from_utf8(b.iter().map(|x| x.clone()).collect::>()) { Err(e) => Err(Err::Utf8DecodingErr(e)),