Add streaming support to listen_bucket_notification() API (#55)

This commit is contained in:
Aditya Manthramurthy 2023-10-03 10:07:52 -07:00 committed by GitHub
parent c63d3f9350
commit 17a6dead9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 189 additions and 147 deletions

View File

@ -11,37 +11,40 @@ keywords = ["object-storage", "minio", "s3"]
categories = ["api-bindings", "web-programming::http-client"] categories = ["api-bindings", "web-programming::http-client"]
[dependencies] [dependencies]
hyper = { version = "0.14.27", features = ["full"] } async-recursion = "1.0.4"
tokio = { version = "1.32.0", features = ["full"] }
derivative = "2.2.0"
multimap = "0.9.0"
urlencoding = "2.1.3"
lazy_static = "1.4.0"
regex = "1.9.4"
chrono = "0.4.27"
sha2 = "0.10.7"
base64 = "0.21.3" base64 = "0.21.3"
md5 = "0.7.0"
crc = "3.0.1"
byteorder = "1.4.3" byteorder = "1.4.3"
hmac = "0.12.1"
hex = "0.4.3"
futures-core = "0.3.28"
bytes = "1.4.0" bytes = "1.4.0"
futures-util = "0.3.28" chrono = "0.4.27"
xmltree = "0.10.3" crc = "3.0.1"
http = "0.2.9"
dashmap = "5.5.3" dashmap = "5.5.3"
derivative = "2.2.0"
futures-util = "0.3.28"
hex = "0.4.3"
hmac = "0.12.1"
http = "0.2.9"
hyper = { version = "0.14.27", features = ["full"] }
lazy_static = "1.4.0"
md5 = "0.7.0"
multimap = "0.9.0"
os_info = "3.7.0"
rand = "0.8.5" rand = "0.8.5"
regex = "1.9.4"
serde = { version = "1.0.188", features = ["derive"] } serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.105" serde_json = "1.0.105"
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } sha2 = "0.10.7"
async-recursion = "1.0.4" tokio = { version = "1.32.0", features = ["full"] }
os_info = "3.7.0" tokio-stream = "0.1.14"
tokio-util = { version = "0.7.8", features = ["io"] }
urlencoding = "2.1.3"
xmltree = "0.10.3"
[dependencies.reqwest] [dependencies.reqwest]
version = "0.11.20" version = "0.11.20"
features = ["native-tls", "blocking", "rustls-tls", "stream"] features = ["native-tls", "blocking", "rustls-tls", "stream"]
[dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
[[example]] [[example]]
name = "file-uploader" name = "file-uploader"

View File

@ -19,8 +19,8 @@ use crate::s3::error::Error;
use crate::s3::signer::post_presign_v4; use crate::s3::signer::post_presign_v4;
use crate::s3::sse::{Sse, SseCustomerKey}; use crate::s3::sse::{Sse, SseCustomerKey};
use crate::s3::types::{ use crate::s3::types::{
DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, NotificationRecords, DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part,
ObjectLockConfig, Part, ReplicationConfig, Retention, RetentionMode, SelectRequest, SseConfig, ReplicationConfig, Retention, RetentionMode, SelectRequest, SseConfig,
}; };
use crate::s3::utils::{ use crate::s3::utils::{
b64encode, check_bucket_name, merge, to_amz_date, to_http_header_value, to_iso8601utc, b64encode, check_bucket_name, merge, to_amz_date, to_http_header_value, to_iso8601utc,
@ -1262,18 +1262,18 @@ impl<'a> SelectObjectContentArgs<'a> {
} }
/// Argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API /// Argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API
pub struct ListenBucketNotificationArgs<'a> { #[derive(Clone, Debug)]
pub extra_headers: Option<&'a Multimap>, pub struct ListenBucketNotificationArgs {
pub extra_query_params: Option<&'a Multimap>, pub extra_headers: Option<Multimap>,
pub region: Option<&'a str>, pub extra_query_params: Option<Multimap>,
pub bucket: &'a str, pub region: Option<String>,
pub prefix: Option<&'a str>, pub bucket: String,
pub suffix: Option<&'a str>, pub prefix: Option<String>,
pub events: Option<Vec<&'a str>>, pub suffix: Option<String>,
pub event_fn: &'a (dyn Fn(NotificationRecords) -> bool + Send + Sync), pub events: Option<Vec<String>>,
} }
impl<'a> ListenBucketNotificationArgs<'a> { impl ListenBucketNotificationArgs {
/// Returns argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API with given bucket name and callback function for results. /// Returns argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API with given bucket name and callback function for results.
/// ///
/// # Examples /// # Examples
@ -1281,35 +1281,19 @@ impl<'a> ListenBucketNotificationArgs<'a> {
/// ``` /// ```
/// use minio::s3::args::*; /// use minio::s3::args::*;
/// use minio::s3::types::NotificationRecords; /// use minio::s3::types::NotificationRecords;
/// let event_fn = |event: NotificationRecords| { ///
/// for record in event.records.iter() { /// let args = ListenBucketNotificationArgs::new("my-bucket").unwrap();
/// if let Some(s3) = &record.s3 {
/// if let Some(object) = &s3.object {
/// if let Some(key) = &object.key {
/// println!("{:?} {:?}", record.event_name, key);
/// }
/// }
/// }
/// }
/// true
/// };
/// let args = ListenBucketNotificationArgs::new("my-bucket", &event_fn).unwrap();
/// ``` /// ```
pub fn new( pub fn new(bucket_name: &str) -> Result<ListenBucketNotificationArgs, Error> {
bucket_name: &'a str,
event_fn: &'a (dyn Fn(NotificationRecords) -> bool + Send + Sync),
) -> Result<ListenBucketNotificationArgs<'a>, Error> {
check_bucket_name(bucket_name, true)?; check_bucket_name(bucket_name, true)?;
Ok(ListenBucketNotificationArgs { Ok(ListenBucketNotificationArgs {
extra_headers: None, extra_headers: None,
extra_query_params: None, extra_query_params: None,
region: None, region: None,
bucket: bucket_name, bucket: bucket_name.to_owned(),
prefix: None, prefix: None,
suffix: None, suffix: None,
events: None, events: None,
event_fn,
}) })
} }
} }

View File

@ -23,8 +23,8 @@ use crate::s3::response::*;
use crate::s3::signer::{presign_v4, sign_v4_s3}; use crate::s3::signer::{presign_v4, sign_v4_s3};
use crate::s3::sse::SseCustomerKey; use crate::s3::sse::SseCustomerKey;
use crate::s3::types::{ use crate::s3::types::{
Bucket, DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, Bucket, DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig,
NotificationRecords, ObjectLockConfig, Part, ReplicationConfig, RetentionMode, SseConfig, Part, ReplicationConfig, RetentionMode, SseConfig,
}; };
use crate::s3::utils::{ use crate::s3::utils::{
from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, merge, sha256_hash, from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, merge, sha256_hash,
@ -36,7 +36,7 @@ use dashmap::DashMap;
use hyper::http::Method; use hyper::http::Method;
use os_info; use os_info;
use reqwest::header::HeaderMap; use reqwest::header::HeaderMap;
use std::collections::{HashMap, VecDeque}; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::io::prelude::*; use std::io::prelude::*;
use std::io::Read; use std::io::Read;
@ -44,6 +44,10 @@ use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use xmltree::Element; use xmltree::Element;
mod listen_bucket_notification;
pub use listen_bucket_notification::*;
fn url_decode( fn url_decode(
encoding_type: &Option<String>, encoding_type: &Option<String>,
prefix: Option<String>, prefix: Option<String>,
@ -2429,96 +2433,6 @@ impl Client {
}) })
} }
pub async fn listen_bucket_notification(
&self,
args: &ListenBucketNotificationArgs<'_>,
) -> Result<ListenBucketNotificationResponse, Error> {
if self.base_url.is_aws_host() {
return Err(Error::UnsupportedApi(String::from(
"ListenBucketNotification",
)));
}
let region = self.get_region(args.bucket, args.region).await?;
let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
if let Some(v) = args.prefix {
query_params.insert(String::from("prefix"), v.to_string());
}
if let Some(v) = args.suffix {
query_params.insert(String::from("suffix"), v.to_string());
}
if let Some(v) = &args.events {
for e in v.iter() {
query_params.insert(String::from("events"), e.to_string());
}
} else {
query_params.insert(String::from("events"), String::from("s3:ObjectCreated:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectRemoved:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectAccessed:*"));
}
let mut resp = self
.execute(
Method::GET,
&region,
&mut headers,
&query_params,
Some(args.bucket),
None,
None,
)
.await?;
let header_map = resp.headers().clone();
let mut done = false;
let mut buf = VecDeque::<u8>::new();
while !done {
let chunk = match resp.chunk().await? {
Some(v) => v,
None => {
done = true;
Bytes::new()
}
};
buf.extend(chunk.iter().copied());
while !done {
match buf.iter().position(|&v| v == b'\n') {
Some(i) => {
let mut data = vec![0_u8; i + 1];
#[allow(clippy::needless_range_loop)]
for j in 0..=i {
data[j] = buf.pop_front().ok_or(Error::InsufficientData(i, j))?;
}
let mut line = String::from_utf8(data)?;
line = line.trim().to_string();
if !line.is_empty() {
let records: NotificationRecords = serde_json::from_str(&line)?;
done = !(args.event_fn)(records);
}
}
None => break,
};
}
}
Ok(ListenBucketNotificationResponse::new(
header_map,
&region,
args.bucket,
))
}
/// Executes [ListObjects](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html) S3 API /// Executes [ListObjects](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html) S3 API
pub async fn list_objects_v1( pub async fn list_objects_v1(
&self, &self,

View File

@ -0,0 +1,134 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2023 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! MinIO Extension API for S3 Buckets: ListenBucketNotification
use futures_util::stream;
use http::Method;
use tokio::io::AsyncBufReadExt;
use tokio_stream::{Stream, StreamExt};
use tokio_util::io::StreamReader;
use crate::s3::{
args::ListenBucketNotificationArgs,
error::Error,
response::ListenBucketNotificationResponse,
types::NotificationRecords,
utils::{merge, Multimap},
};
use super::Client;
impl Client {
/// Listens for bucket notifications. This is MinIO extension API. This
/// function returns a tuple of `ListenBucketNotificationResponse` and a
/// stream of `NotificationRecords`. The former contains the HTTP headers
/// returned by the server and the latter is a stream of notification
/// records. In normal operation (when there are no errors), the stream
/// never ends.
pub async fn listen_bucket_notification(
&self,
args: ListenBucketNotificationArgs,
) -> Result<
(
ListenBucketNotificationResponse,
impl Stream<Item = Result<NotificationRecords, Error>>,
),
Error,
> {
if self.base_url.is_aws_host() {
return Err(Error::UnsupportedApi(String::from(
"ListenBucketNotification",
)));
}
let region = self
.get_region(&args.bucket, args.region.as_deref())
.await?;
let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
if let Some(v) = args.prefix {
query_params.insert(String::from("prefix"), v.to_string());
}
if let Some(v) = args.suffix {
query_params.insert(String::from("suffix"), v.to_string());
}
if let Some(v) = &args.events {
for e in v.iter() {
query_params.insert(String::from("events"), e.to_string());
}
} else {
query_params.insert(String::from("events"), String::from("s3:ObjectCreated:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectRemoved:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectAccessed:*"));
}
let resp = self
.execute(
Method::GET,
&region,
&mut headers,
&query_params,
Some(&args.bucket),
None,
None,
)
.await?;
let header_map = resp.headers().clone();
let body_stream = resp.bytes_stream();
let body_stream = body_stream
.map(|r| r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)));
let stream_reader = StreamReader::new(body_stream);
let record_stream = Box::pin(stream::unfold(
stream_reader,
move |mut reader| async move {
loop {
let mut line = String::new();
match reader.read_line(&mut line).await {
Ok(n) => {
if n == 0 {
return None;
}
let s = line.trim();
if s.is_empty() {
continue;
}
let records_res: Result<NotificationRecords, Error> =
serde_json::from_str(&s).map_err(|e| e.into());
return Some((records_res, reader));
}
Err(e) => return Some((Err(e.into()), reader)),
}
}
},
));
Ok((
ListenBucketNotificationResponse::new(header_map, &region, &args.bucket),
record_stream,
))
}
}

View File

@ -15,6 +15,7 @@
use async_std::task; use async_std::task;
use chrono::Duration; use chrono::Duration;
use futures_util::stream::StreamExt;
use hyper::http::Method; use hyper::http::Method;
use minio::s3::types::NotificationRecords; use minio::s3::types::NotificationRecords;
use rand::distributions::{Alphanumeric, DistString}; use rand::distributions::{Alphanumeric, DistString};
@ -564,8 +565,14 @@ impl ClientTest {
false false
}; };
let args = &ListenBucketNotificationArgs::new(&test_bucket, &event_fn).unwrap(); let args = ListenBucketNotificationArgs::new(&test_bucket).unwrap();
client.listen_bucket_notification(args).await.unwrap(); let (_, mut event_stream) = client.listen_bucket_notification(args).await.unwrap();
while let Some(event) = event_stream.next().await {
let event = event.unwrap();
if !event_fn(event) {
break;
}
}
}; };
let spawned_task = task::spawn(listen_task()); let spawned_task = task::spawn(listen_task());