Refactor to use enum for conn client

This commit is contained in:
Aditya Manthramurthy 2019-05-30 13:58:17 -07:00
parent eb35be48d3
commit f310d96d37
2 changed files with 57 additions and 53 deletions

View File

@ -15,8 +15,8 @@ fn get_local_default_server() -> minio::Client {
fn main() {
rt::run(rt::lazy(|| {
// let c = get_local_default_server();
let c = minio::Client::get_play_client();
let c = get_local_default_server();
// let c = minio::Client::get_play_client();
c.get_bucket_location("txp")
.map(|res| println!("{}", res))
.map_err(|err| println!("{:?}", err))

View File

@ -6,7 +6,7 @@ use futures::future::{self, Future};
use futures::stream::Stream;
use http;
use hyper::header::{HeaderName, HeaderValue};
use hyper::{body::Body, client, header, header::HeaderMap, Method, Uri};
use hyper::{body::Body, client, header, header::HeaderMap, Method, Request, Response, Uri};
use hyper_tls::HttpsConnector;
use std::collections::HashMap;
use std::env;
@ -50,36 +50,28 @@ pub enum Err {
HyperErr(hyper::Error),
FailStatusCodeErr(hyper::StatusCode, Bytes),
Utf8DecodingErr(string::FromUtf8Error),
RawSvcErr(hyper::StatusCode, Response<Body>),
}
trait ApiClient {
fn make_req(&self, req: http::Request<Body>) -> client::ResponseFuture;
#[derive(Clone)]
enum ConnClient {
HttpCC(client::Client<client::HttpConnector, Body>),
HttpsCC(client::Client<HttpsConnector<client::HttpConnector>, Body>),
}
struct HttpApiClient {
c: client::Client<client::HttpConnector, Body>,
}
impl ApiClient for HttpApiClient {
impl ConnClient {
fn make_req(&self, req: http::Request<Body>) -> client::ResponseFuture {
self.c.request(req)
}
}
struct HttpsApiClient {
c: client::Client<HttpsConnector<client::HttpConnector>, Body>,
}
impl ApiClient for HttpsApiClient {
fn make_req(&self, req: http::Request<Body>) -> client::ResponseFuture {
self.c.request(req)
match self {
ConnClient::HttpCC(c) => c.request(req),
ConnClient::HttpsCC(c) => c.request(req),
}
}
}
pub struct Client {
server: Uri,
region: Region,
conn_client: Arc<Box<dyn ApiClient + Send + Sync>>,
conn_client: ConnClient,
pub credentials: Option<Credentials>,
}
@ -97,14 +89,12 @@ impl Client {
server: s.clone(),
region: String::from(""),
conn_client: if s.scheme_str() == Some("http") {
Arc::new(Box::new(HttpApiClient {
c: client::Client::new(),
}))
ConnClient::HttpCC(client::Client::new())
} else {
let https = HttpsConnector::new(4).unwrap();
Arc::new(Box::new(HttpsApiClient {
c: client::Client::builder().build::<_, hyper::Body>(https),
}))
ConnClient::HttpsCC(
client::Client::builder().build::<_, hyper::Body>(https),
)
},
credentials: None,
})
@ -141,9 +131,7 @@ impl Client {
region: String::from(""),
conn_client: {
let https = HttpsConnector::new(4).unwrap();
Arc::new(Box::new(HttpsApiClient {
c: client::Client::builder().build::<_, hyper::Body>(https),
}))
ConnClient::HttpsCC(client::Client::builder().build::<_, hyper::Body>(https))
},
credentials: Some(Credentials::new(
"Q3AM3UQ867SPQQA43P2F",
@ -176,31 +164,47 @@ impl Client {
let sign_hdrs = sign::sign_v4(&s3_req, &self);
println!("signout: {:?}", sign_hdrs);
let req_result = api::mk_request(&s3_req, &self, &sign_hdrs);
let conn_client = Arc::clone(&self.conn_client);
future::result(req_result)
.map_err(|e| Err::HttpErr(e))
.and_then(move |req| {
conn_client
.make_req(req)
.map_err(|e| Err::HyperErr(e))
.and_then(|resp| {
let st = resp.status();
resp.into_body()
.concat2()
.map_err(|err| Err::HyperErr(err))
.map(move |chunk| (st, chunk.into_bytes()))
})
.and_then(|(code, body)| {
if code.is_success() {
b2s(body)
} else {
Err(Err::FailStatusCodeErr(code, body))
}
})
})
let conn_client = self.conn_client.clone();
run_req_future(req_result, conn_client).and_then(|resp| {
// Read the whole body for bucket location response.
resp.into_body()
.concat2()
.map_err(|err| Err::HyperErr(err))
.and_then(move |chunk| b2s(chunk.into_bytes()))
})
}
}
fn run_req_future(
req_result: http::Result<Request<Body>>,
c: ConnClient,
) -> impl Future<Item = Response<Body>, Error = Err> {
future::result(req_result)
.map_err(|e| Err::HttpErr(e))
.and_then(move |req| c.make_req(req).map_err(|e| Err::HyperErr(e)))
.and_then(|resp| {
let st = resp.status();
if st.is_success() {
Ok(resp)
} else {
Err(Err::RawSvcErr(st, resp))
}
})
.or_else(|err| {
future::err(err)
.or_else(|x| match x {
Err::RawSvcErr(st, resp) => Ok((st, resp)),
other_err => Err(other_err),
})
.and_then(|(st, resp)| {
resp.into_body()
.concat2()
.map_err(|err| Err::HyperErr(err))
.and_then(move |chunk| Err(Err::FailStatusCodeErr(st, chunk.into_bytes())))
})
})
}
fn b2s(b: Bytes) -> Result<String, Err> {
match String::from_utf8(b.iter().map(|x| x.clone()).collect::<Vec<u8>>()) {
Err(e) => Err(Err::Utf8DecodingErr(e)),