Refactor Notification API. Move code to it's own file. (#6)

* Refactor Notification API. Move code to it's own file.

* Fix missing import

* Remove empty line

* Fix Typo
This commit is contained in:
Daniel Valdivia 2019-07-12 16:23:01 -07:00 committed by Aditya Manthramurthy
parent 891d050e35
commit 1127594f83
2 changed files with 68 additions and 62 deletions

View File

@ -15,12 +15,11 @@
* limitations under the License. * limitations under the License.
*/ */
use std::collections::HashMap;
use std::env; use std::env;
use std::string::String; use std::string::String;
use futures::future::{self, Future}; use futures::future::{self, Future};
use futures::{stream, Stream}; use futures::Stream;
use http; use http;
use hyper::header::{HeaderName, HeaderValue}; use hyper::header::{HeaderName, HeaderValue};
use hyper::{body::Body, client, header, header::HeaderMap, Method, Request, Response, Uri}; use hyper::{body::Body, client, header, header::HeaderMap, Method, Request, Response, Uri};
@ -32,11 +31,10 @@ pub use types::BucketInfo;
use types::{Err, GetObjectResp, ListObjectsResp, Region}; use types::{Err, GetObjectResp, ListObjectsResp, Region};
use crate::minio::net::{Values, ValuesAccess}; use crate::minio::net::{Values, ValuesAccess};
use crate::minio::notification::NotificationInfo;
mod api; mod api;
mod api_notification;
mod net; mod net;
mod notification;
mod sign; mod sign;
mod types; mod types;
mod xml; mod xml;
@ -385,64 +383,6 @@ impl Client {
.and_then(|s| xml::parse_list_objects(s)) .and_then(|s| xml::parse_list_objects(s))
}) })
} }
/// listen_bucket_notificaion - Get bucket notifications for the bucket_name.
pub fn listen_bucket_notificaion(
&self,
bucket_name: &str,
prefix: Option<String>,
suffix: Option<String>,
events: Vec<String>,
) -> impl Stream<Item = notification::NotificationInfo, Error = Err> {
// Prepare request query parameters
let mut query_params: Values = Values::new();
query_params.set_value("prefix", prefix);
query_params.set_value("suffix", suffix);
let opt_events: Vec<Option<String>> = events.into_iter().map(|evt| Some(evt)).collect();
query_params.insert("events".to_string(), opt_events);
// build signed request
let s3_req = S3Req {
method: Method::GET,
bucket: Some(bucket_name.to_string()),
object: None,
headers: HeaderMap::new(),
query: query_params,
body: Body::empty(),
ts: time::now_utc(),
};
self.signed_req_future(s3_req, Ok(Body::empty()))
.map(|resp| {
// Read the whole body for bucket location response.
resp.into_body()
.map_err(|e| Err::HyperErr(e))
.filter(|c| {
// filter out white spaces sent by the server to indicate it's still alive
c[0] != SPACE_BYTE[0]
})
.map(|chunk| {
// Split the chunk by lines and process.
// TODO: Handle case when partial lines are present in the chunk
let chunk_lines = String::from_utf8(chunk.to_vec())
.map(|p| {
let lines =
p.lines().map(|s| s.to_string()).collect::<Vec<String>>();
stream::iter_ok(lines.into_iter())
})
.map_err(|e| Err::Utf8DecodingErr(e));
futures::future::result(chunk_lines).flatten_stream()
})
.flatten()
.map(|line| {
// Deserialize the notification
let notification_info: NotificationInfo =
serde_json::from_str(&line).unwrap();
notification_info
})
})
.flatten_stream()
}
} }
fn run_req_future( fn run_req_future(
@ -515,6 +455,7 @@ impl S3Req {
#[cfg(test)] #[cfg(test)]
mod minio_tests { mod minio_tests {
use super::*; use super::*;
use std::collections::HashMap;
#[test] #[test]
fn serialize_query_parameters() { fn serialize_query_parameters() {

View File

@ -17,6 +17,11 @@
use std::collections::HashMap; use std::collections::HashMap;
use crate::minio::net::{Values, ValuesAccess};
use crate::minio::{Client, Err, S3Req, SPACE_BYTE};
use futures::future::Future;
use futures::{stream, Stream};
use hyper::{Body, HeaderMap, Method};
use serde_derive::Deserialize; use serde_derive::Deserialize;
/// Notification event object metadata. /// Notification event object metadata.
@ -110,3 +115,63 @@ pub struct NotificationInfo {
pub records: Vec<NotificationEvent>, pub records: Vec<NotificationEvent>,
pub err: Option<String>, pub err: Option<String>,
} }
impl Client {
/// listen_bucket_notificaion - Get bucket notifications for the bucket_name.
pub fn listen_bucket_notification(
&self,
bucket_name: &str,
prefix: Option<String>,
suffix: Option<String>,
events: Vec<String>,
) -> impl Stream<Item = NotificationInfo, Error = Err> {
// Prepare request query parameters
let mut query_params: Values = Values::new();
query_params.set_value("prefix", prefix);
query_params.set_value("suffix", suffix);
let opt_events: Vec<Option<String>> = events.into_iter().map(|evt| Some(evt)).collect();
query_params.insert("events".to_string(), opt_events);
// build signed request
let s3_req = S3Req {
method: Method::GET,
bucket: Some(bucket_name.to_string()),
object: None,
headers: HeaderMap::new(),
query: query_params,
body: Body::empty(),
ts: time::now_utc(),
};
self.signed_req_future(s3_req, Ok(Body::empty()))
.map(|resp| {
// Read the whole body for bucket location response.
resp.into_body()
.map_err(|e| Err::HyperErr(e))
.filter(|c| {
// filter out white spaces sent by the server to indicate it's still alive
c[0] != SPACE_BYTE[0]
})
.map(|chunk| {
// Split the chunk by lines and process.
// TODO: Handle case when partial lines are present in the chunk
let chunk_lines = String::from_utf8(chunk.to_vec())
.map(|p| {
let lines =
p.lines().map(|s| s.to_string()).collect::<Vec<String>>();
stream::iter_ok(lines.into_iter())
})
.map_err(|e| Err::Utf8DecodingErr(e));
futures::future::result(chunk_lines).flatten_stream()
})
.flatten()
.map(|line| {
// Deserialize the notification
let notification_info: NotificationInfo =
serde_json::from_str(&line).unwrap();
notification_info
})
})
.flatten_stream()
}
}