From 1127594f83e773026f6e4d3241a73544ce0cbff8 Mon Sep 17 00:00:00 2001 From: Daniel Valdivia Date: Fri, 12 Jul 2019 16:23:01 -0700 Subject: [PATCH] Refactor Notification API. Move code to it's own file. (#6) * Refactor Notification API. Move code to it's own file. * Fix missing import * Remove empty line * Fix Typo --- src/minio.rs | 65 +------------------ .../{notification.rs => api_notification.rs} | 65 +++++++++++++++++++ 2 files changed, 68 insertions(+), 62 deletions(-) rename src/minio/{notification.rs => api_notification.rs} (58%) diff --git a/src/minio.rs b/src/minio.rs index f15ee74..a8b94f6 100644 --- a/src/minio.rs +++ b/src/minio.rs @@ -15,12 +15,11 @@ * limitations under the License. */ -use std::collections::HashMap; use std::env; use std::string::String; use futures::future::{self, Future}; -use futures::{stream, Stream}; +use futures::Stream; use http; use hyper::header::{HeaderName, HeaderValue}; use hyper::{body::Body, client, header, header::HeaderMap, Method, Request, Response, Uri}; @@ -32,11 +31,10 @@ pub use types::BucketInfo; use types::{Err, GetObjectResp, ListObjectsResp, Region}; use crate::minio::net::{Values, ValuesAccess}; -use crate::minio::notification::NotificationInfo; mod api; +mod api_notification; mod net; -mod notification; mod sign; mod types; mod xml; @@ -385,64 +383,6 @@ impl Client { .and_then(|s| xml::parse_list_objects(s)) }) } - - /// listen_bucket_notificaion - Get bucket notifications for the bucket_name. - pub fn listen_bucket_notificaion( - &self, - bucket_name: &str, - prefix: Option, - suffix: Option, - events: Vec, - ) -> impl Stream { - // Prepare request query parameters - let mut query_params: Values = Values::new(); - query_params.set_value("prefix", prefix); - query_params.set_value("suffix", suffix); - let opt_events: Vec> = events.into_iter().map(|evt| Some(evt)).collect(); - query_params.insert("events".to_string(), opt_events); - - // build signed request - let s3_req = S3Req { - method: Method::GET, - bucket: Some(bucket_name.to_string()), - object: None, - headers: HeaderMap::new(), - query: query_params, - body: Body::empty(), - ts: time::now_utc(), - }; - - self.signed_req_future(s3_req, Ok(Body::empty())) - .map(|resp| { - // Read the whole body for bucket location response. - resp.into_body() - .map_err(|e| Err::HyperErr(e)) - .filter(|c| { - // filter out white spaces sent by the server to indicate it's still alive - c[0] != SPACE_BYTE[0] - }) - .map(|chunk| { - // Split the chunk by lines and process. - // TODO: Handle case when partial lines are present in the chunk - let chunk_lines = String::from_utf8(chunk.to_vec()) - .map(|p| { - let lines = - p.lines().map(|s| s.to_string()).collect::>(); - stream::iter_ok(lines.into_iter()) - }) - .map_err(|e| Err::Utf8DecodingErr(e)); - futures::future::result(chunk_lines).flatten_stream() - }) - .flatten() - .map(|line| { - // Deserialize the notification - let notification_info: NotificationInfo = - serde_json::from_str(&line).unwrap(); - notification_info - }) - }) - .flatten_stream() - } } fn run_req_future( @@ -515,6 +455,7 @@ impl S3Req { #[cfg(test)] mod minio_tests { use super::*; + use std::collections::HashMap; #[test] fn serialize_query_parameters() { diff --git a/src/minio/notification.rs b/src/minio/api_notification.rs similarity index 58% rename from src/minio/notification.rs rename to src/minio/api_notification.rs index 93b88cf..ebc7e52 100644 --- a/src/minio/notification.rs +++ b/src/minio/api_notification.rs @@ -17,6 +17,11 @@ use std::collections::HashMap; +use crate::minio::net::{Values, ValuesAccess}; +use crate::minio::{Client, Err, S3Req, SPACE_BYTE}; +use futures::future::Future; +use futures::{stream, Stream}; +use hyper::{Body, HeaderMap, Method}; use serde_derive::Deserialize; /// Notification event object metadata. @@ -110,3 +115,63 @@ pub struct NotificationInfo { pub records: Vec, pub err: Option, } + +impl Client { + /// listen_bucket_notificaion - Get bucket notifications for the bucket_name. + pub fn listen_bucket_notification( + &self, + bucket_name: &str, + prefix: Option, + suffix: Option, + events: Vec, + ) -> impl Stream { + // Prepare request query parameters + let mut query_params: Values = Values::new(); + query_params.set_value("prefix", prefix); + query_params.set_value("suffix", suffix); + let opt_events: Vec> = events.into_iter().map(|evt| Some(evt)).collect(); + query_params.insert("events".to_string(), opt_events); + + // build signed request + let s3_req = S3Req { + method: Method::GET, + bucket: Some(bucket_name.to_string()), + object: None, + headers: HeaderMap::new(), + query: query_params, + body: Body::empty(), + ts: time::now_utc(), + }; + + self.signed_req_future(s3_req, Ok(Body::empty())) + .map(|resp| { + // Read the whole body for bucket location response. + resp.into_body() + .map_err(|e| Err::HyperErr(e)) + .filter(|c| { + // filter out white spaces sent by the server to indicate it's still alive + c[0] != SPACE_BYTE[0] + }) + .map(|chunk| { + // Split the chunk by lines and process. + // TODO: Handle case when partial lines are present in the chunk + let chunk_lines = String::from_utf8(chunk.to_vec()) + .map(|p| { + let lines = + p.lines().map(|s| s.to_string()).collect::>(); + stream::iter_ok(lines.into_iter()) + }) + .map_err(|e| Err::Utf8DecodingErr(e)); + futures::future::result(chunk_lines).flatten_stream() + }) + .flatten() + .map(|line| { + // Deserialize the notification + let notification_info: NotificationInfo = + serde_json::from_str(&line).unwrap(); + notification_info + }) + }) + .flatten_stream() + } +}