diff --git a/Cargo.toml b/Cargo.toml index 2793069..0c2b1ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,9 +15,12 @@ pretty_env_logger = "0.3.0" ring = "0.14.6" roxmltree = "0.6.0" serde = "1.0.92" -serde_derive = "1.0.91" +serde_derive = "1.0.124" serde_json = "1.0.39" time = "0.1.42" -tokio = "0.1.21" -xml-rs = "0.8.0" +tokio = { version = "1", features = ["full"] } +xml-rs = "0.8.3" +quick-xml = { version = "0.22", features = [ "serialize" ] } +thiserror = "1.0.24" + diff --git a/src/lib.rs b/src/lib.rs index 130e1a3..39e19b1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,7 +23,6 @@ mod tests { use hyper::rt; use log::debug; - use minio::BucketInfo; use super::*; @@ -39,18 +38,23 @@ mod tests { #[test] fn test_lib_functions() { + println!("test func"); rt::run(rt::lazy(|| { let c = minio::Client::get_play_client(); - let bucket = "aaaa"; + let bucket_name = "aaaa"; - c.put_object_req(bucket, "hhhhhhhhhh", vec![], "object content".as_bytes().to_vec()) - .and_then(|g| { - debug!("object: {} {} {:?}", g.object_size, g.etag, g.content_type); - g.get_object_stream().concat2() - }) - .map(|c| debug!("get obj res: {:?}", c)) - .map_err(|c| debug!("err res: {:?}", c)) - .map(|_| {}) + c.put_object_req(bucket_name, "hhhhhhhhhh", vec![], "object content".as_bytes().to_vec()) + .and_then(|g| { + print!("object: {} {} {:?}", g.object_size, g.etag, g.content_type); + g.get_object_stream().concat2() + }) + .map(|c| { + println!("{:?}", c); + }) + .map_err(|c| { + println!("{:?}", c); + }) + .map(|_| {}) })); rt::run(rt::lazy(|| { @@ -58,13 +62,13 @@ mod tests { let bucket = "aaaa"; c.get_object_req(bucket, "hhhhhhhhhh", vec![]) - .and_then(|g| { - debug!("object: {} {} {:?}", g.object_size, g.etag, g.content_type); - g.get_object_stream().concat2() - }) - .map(|c| debug!("get obj res: {:?}", c)) - .map_err(|c| debug!("err res: {:?}", c)) - .map(|_| {}) + .and_then(|g| { + debug!("object: {} {} {:?}", g.object_size, g.etag, g.content_type); + g.get_object_stream().concat2() + }) + .map(|c| debug!("get obj res: {:?}", c)) + .map_err(|c| debug!("err res: {:?}", c)) + .map(|_| {}) })); rt::run(rt::lazy(|| { @@ -72,9 +76,9 @@ mod tests { let bucket = "aaaa"; c.delete_bucket(bucket) - .map(|_| debug!("Deleted!")) - .map_err(|err| debug!("del err: {:?}", err)) - .map(|_| {}) + .map(|_| debug!("Deleted!")) + .map_err(|err| debug!("del err: {:?}", err)) + .map(|_| {}) })); } } diff --git a/src/minio.rs b/src/minio.rs index f312778..60e9ab5 100644 --- a/src/minio.rs +++ b/src/minio.rs @@ -15,22 +15,28 @@ * limitations under the License. */ + use std::env; +use std::str; use std::string::String; use futures::future::{self, Future}; use futures::Stream; use http; -use hyper::header::{HeaderName, HeaderValue}; use hyper::{body::Body, client, header, header::HeaderMap, Method, Request, Response, Uri}; +use hyper::header::{HeaderName, HeaderValue}; use hyper_tls::HttpsConnector; +use log::debug; use time; use time::Tm; -pub use types::BucketInfo; use types::{Err, GetObjectResp, ListObjectsResp, Region}; +pub use types::BucketInfo; + use crate::minio::net::{Values, ValuesAccess}; +use crate::minio::xml::{parse_s3_error, S3GenericError}; +use bytes::Buf; mod api; mod api_notification; @@ -38,7 +44,6 @@ mod net; mod sign; mod types; mod xml; -use log::debug; mod woxml; pub const SPACE_BYTE: &[u8; 1] = b" "; @@ -161,7 +166,7 @@ impl Client { &self, mut s3_req: S3Req, body_res: Result, - ) -> impl Future, Error = Err> { + ) -> impl Future, Error=Err> { let hmap = &mut s3_req.headers; self.add_host_header(hmap); @@ -205,7 +210,16 @@ impl Client { .concat2() .map_err(|err| Err::HyperErr(err)) .and_then(move |chunk| { - Err(Err::FailStatusCodeErr(st, chunk.into_bytes())) + match st.as_str() { + "404" => { + let x = str::from_utf8(&chunk.bytes()); + let s3_err = parse_s3_error(x.unwrap()); + Err(Err::S3Error(s3_err)) + } + _ => { + Err(Err::FailStatusCodeErr(st, chunk.into_bytes())) + } + } }) }) }) @@ -215,7 +229,7 @@ impl Client { pub fn get_bucket_location( &self, bucket_name: &str, - ) -> impl Future { + ) -> impl Future { let mut qp = Values::new(); qp.set_value("location", None); @@ -239,7 +253,7 @@ impl Client { }) } - pub fn delete_bucket(&self, bucket_name: &str) -> impl Future { + pub fn delete_bucket(&self, bucket_name: &str) -> impl Future { let s3_req = S3Req { method: Method::DELETE, bucket: Some(bucket_name.to_string()), @@ -253,7 +267,7 @@ impl Client { .and_then(|_| Ok(())) } - pub fn bucket_exists(&self, bucket_name: &str) -> impl Future { + pub fn bucket_exists(&self, bucket_name: &str) -> impl Future { let s3_req = S3Req { method: Method::HEAD, bucket: Some(bucket_name.to_string()), @@ -283,7 +297,7 @@ impl Client { bucket_name: &str, key: &str, get_obj_opts: Vec<(HeaderName, HeaderValue)>, - ) -> impl Future { + ) -> impl Future { let mut h = HeaderMap::new(); get_obj_opts .iter() @@ -312,7 +326,7 @@ impl Client { key: &str, get_obj_opts: Vec<(HeaderName, HeaderValue)>, data: Vec, - ) -> impl Future { + ) -> impl Future { let mut h = HeaderMap::new(); get_obj_opts .iter() @@ -351,7 +365,7 @@ impl Client { .and_then(|_| future::ok(())) } - pub fn list_buckets(&self) -> impl Future, Error = Err> { + pub fn list_buckets(&self) -> impl Future, Error=Err> { let s3_req = S3Req { method: Method::GET, bucket: None, @@ -379,7 +393,7 @@ impl Client { marker: Option<&str>, delimiter: Option<&str>, max_keys: Option, - ) -> impl Future { + ) -> impl Future { let mut qparams: Values = Values::new(); qparams.set_value("list-type", Some("2".to_string())); if let Some(d) = delimiter { @@ -420,7 +434,7 @@ impl Client { fn run_req_future( req_result: Result, Err>, c: ConnClient, -) -> impl Future, Error = Err> { +) -> impl Future, Error=Err> { future::result(req_result) //.map_err(|e| Err::HttpErr(e)) .and_then(move |req| c.make_req(req).map_err(|e| Err::HyperErr(e))) @@ -475,7 +489,7 @@ impl S3Req { .map(|(key, values)| { values.iter().map(move |value| match value { Some(v) => format!("{}={}", &key, v), - None => format!("{}=", &key,), + None => format!("{}=", &key, ), }) }) .flatten() @@ -486,9 +500,10 @@ impl S3Req { #[cfg(test)] mod minio_tests { - use super::*; use std::collections::HashMap; + use super::*; + #[test] fn serialize_query_parameters() { let mut query_params: HashMap>> = HashMap::new(); diff --git a/src/minio/types.rs b/src/minio/types.rs index a8bd108..790a9ce 100644 --- a/src/minio/types.rs +++ b/src/minio/types.rs @@ -26,6 +26,7 @@ use roxmltree; use std::collections::HashMap; use std::string; use time::{strptime, Tm}; +use crate::minio::xml::S3GenericError; #[derive(Clone)] pub struct Region(String); @@ -60,6 +61,7 @@ pub enum Err { MissingRequiredParams, RawSvcErr(hyper::StatusCode, Response), XmlWriteErr(String), + S3Error(S3GenericError), } pub struct GetObjectResp { diff --git a/src/minio/xml.rs b/src/minio/xml.rs index ac73cc1..25d6866 100644 --- a/src/minio/xml.rs +++ b/src/minio/xml.rs @@ -15,11 +15,17 @@ * limitations under the License. */ +extern crate quick_xml; +extern crate serde; + use std::collections::HashMap; use std::str::FromStr; +use std::str; use hyper::body::Body; use roxmltree; +use serde_derive::Deserialize; +use thiserror::Error; use crate::minio::types::{BucketInfo, Err, ListObjectsResp, ObjectInfo, Region}; use crate::minio::woxml; @@ -39,6 +45,80 @@ pub fn parse_bucket_location(s: String) -> Result { } } +#[allow(non_snake_case)] +#[derive(Debug, Deserialize, PartialEq)] +pub struct Error { + Param: Option, + Code: String, + Message: String, + BucketName: String, + Key: Option, + RequestId: String, + HostId: String, + // Region where the bucket is located. This header is returned + // only in HEAD bucket and ListObjects response. + Region: Option, +} + + +#[derive(Error, Debug, PartialEq)] +pub enum S3GenericError { + #[error("no such bucket: {error:?}")] + NoSuchBucket { + error: Error, + }, + #[error("unknown error: {error:?}")] + Unknown { + error: Error, + }, +} + + +pub(crate) fn parse_s3_error(response_xml: &str) -> S3GenericError { + println!("{}",response_xml); + let doc: Error = quick_xml::de::from_str(response_xml).unwrap(); + match doc.Code.as_str() { + "NoSuchBucket" => { + return S3GenericError::NoSuchBucket { + error: doc, + }; + } + _ => { + return S3GenericError::Unknown { + error: doc, + }; + } + } +} + +#[cfg(test)] +mod xml_tests { + + use super::*; + + #[test] + fn parse_xml_error() { + let response_xml = r#" + + + NoSuchBucket + The specified bucket does not exist + hhhhhhhhhh + aaaa + /aaaa/hhhhhhhhhh + 166B5E4E3A406CC6 + 129c19c9-4cf6-44ff-9f2d-4cb7611be894 + + "#; + + let s3_error = parse_s3_error(response_xml); + + print!("test! {:?}", s3_error); + assert!(matches!(s3_error, S3GenericError::NoSuchBucket {cerror} )); + } +} + + pub fn parse_bucket_list(s: String) -> Result, Err> { let res = roxmltree::Document::parse(&s); match res { @@ -105,16 +185,16 @@ fn get_child_node_or<'a>(node: &'a roxmltree::Node, tag_name: &str, default: &'a } fn parse_child_content(node: &roxmltree::Node, tag: &str) -> Result -where - T: FromStr, + where + T: FromStr, { let content = get_child_node(node, tag).ok_or(Err::XmlElemMissing(format!("{:?}", tag)))?; str::parse::(content).map_err(|_| Err::XmlElemParseErr(format!("{}", tag))) } fn parse_tag_content(node: &roxmltree::Node) -> Result -where - T: FromStr, + where + T: FromStr, { let content = must_get_node_text(node)?; str::parse::(content).map_err(|_| Err::XmlElemParseErr(format!("{:?}", node.tag_name()))) @@ -141,7 +221,7 @@ fn parse_object_infos(node: roxmltree::Node) -> Result, Err> { .children() .filter(|node| node.has_tag_name("StorageClass")); for (key, (mtime, (etag, (size, storage_class)))) in - keys.zip(mtimes.zip(etags.zip(sizes.zip(storage_classes)))) + keys.zip(mtimes.zip(etags.zip(sizes.zip(storage_classes)))) { let sz: i64 = parse_tag_content(&size)?; let key_text = must_get_node_text(&key)?;