Add listen_bucket_notification() API (#15)

Signed-off-by: Bala.FA <bala@minio.io>
This commit is contained in:
Bala FA 2022-08-31 20:50:33 +05:30 committed by GitHub
parent f88c9dd919
commit c21dda1492
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 406 additions and 39 deletions

View File

@ -27,6 +27,9 @@ xmltree = "0.10.3"
http = "0.2.8" http = "0.2.8"
dashmap = "5.3.4" dashmap = "5.3.4"
rand = "0.8.5" rand = "0.8.5"
serde = { version = "1.0.143", features = ["derive"] }
serde_json = "1.0.83"
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
[dependencies.reqwest] [dependencies.reqwest]
version = "0.11.11" version = "0.11.11"

View File

@ -15,7 +15,7 @@
use crate::s3::error::Error; use crate::s3::error::Error;
use crate::s3::sse::{Sse, SseCustomerKey}; use crate::s3::sse::{Sse, SseCustomerKey};
use crate::s3::types::{DeleteObject, Item, Part, Retention, SelectRequest}; use crate::s3::types::{DeleteObject, Item, NotificationRecords, Part, Retention, SelectRequest};
use crate::s3::utils::{ use crate::s3::utils::{
check_bucket_name, merge, to_http_header_value, to_iso8601utc, urlencode, Multimap, UtcTime, check_bucket_name, merge, to_http_header_value, to_iso8601utc, urlencode, Multimap, UtcTime,
}; };
@ -979,3 +979,34 @@ impl<'a> SelectObjectContentArgs<'a> {
}) })
} }
} }
pub struct ListenBucketNotificationArgs<'a> {
pub extra_headers: Option<&'a Multimap>,
pub extra_query_params: Option<&'a Multimap>,
pub region: Option<&'a str>,
pub bucket: &'a str,
pub prefix: Option<&'a str>,
pub suffix: Option<&'a str>,
pub events: Option<Vec<&'a str>>,
pub event_fn: &'a (dyn Fn(NotificationRecords) -> bool + Send + Sync),
}
impl<'a> ListenBucketNotificationArgs<'a> {
pub fn new(
bucket_name: &'a str,
event_fn: &'a (dyn Fn(NotificationRecords) -> bool + Send + Sync),
) -> Result<ListenBucketNotificationArgs<'a>, Error> {
check_bucket_name(bucket_name, true)?;
Ok(ListenBucketNotificationArgs {
extra_headers: None,
extra_query_params: None,
region: None,
bucket: bucket_name,
prefix: None,
suffix: None,
events: None,
event_fn: event_fn,
})
}
}

View File

@ -20,7 +20,7 @@ use crate::s3::http::{BaseUrl, Url};
use crate::s3::response::*; use crate::s3::response::*;
use crate::s3::signer::sign_v4_s3; use crate::s3::signer::sign_v4_s3;
use crate::s3::sse::SseCustomerKey; use crate::s3::sse::SseCustomerKey;
use crate::s3::types::{Bucket, DeleteObject, Item, Part}; use crate::s3::types::{Bucket, DeleteObject, Item, NotificationRecords, Part};
use crate::s3::utils::{ use crate::s3::utils::{
from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, merge, sha256_hash, from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, merge, sha256_hash,
to_amz_date, urldecode, utc_now, Multimap, to_amz_date, urldecode, utc_now, Multimap,
@ -29,7 +29,7 @@ use bytes::{Buf, Bytes};
use dashmap::DashMap; use dashmap::DashMap;
use hyper::http::Method; use hyper::http::Method;
use reqwest::header::HeaderMap; use reqwest::header::HeaderMap;
use std::collections::HashMap; use std::collections::{HashMap, VecDeque};
use std::fs::File; use std::fs::File;
use std::io::Read; use std::io::Read;
use xmltree::Element; use xmltree::Element;
@ -203,7 +203,7 @@ fn parse_list_objects_common_prefixes(
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct Client<'a> { pub struct Client<'a> {
base_url: BaseUrl, base_url: BaseUrl,
provider: Option<&'a dyn Provider>, provider: Option<&'a (dyn Provider + Send + Sync)>,
pub ssl_cert_file: String, pub ssl_cert_file: String,
pub ignore_cert_check: bool, pub ignore_cert_check: bool,
pub user_agent: String, pub user_agent: String,
@ -212,7 +212,7 @@ pub struct Client<'a> {
} }
impl<'a> Client<'a> { impl<'a> Client<'a> {
pub fn new(base_url: BaseUrl, provider: Option<&dyn Provider>) -> Client { pub fn new(base_url: BaseUrl, provider: Option<&(dyn Provider + Send + Sync)>) -> Client {
Client { Client {
base_url: base_url, base_url: base_url,
provider: provider, provider: provider,
@ -458,15 +458,18 @@ impl<'a> Client<'a> {
.build_url(&method, region, query_params, bucket_name, object_name)?; .build_url(&method, region, query_params, bucket_name, object_name)?;
self.build_headers(headers, query_params, region, &url, &method, body); self.build_headers(headers, query_params, region, &url, &method, body);
let mut buf = Vec::new(); let mut builder = reqwest::Client::builder().no_gzip();
File::open(self.ssl_cert_file.to_string())?.read_to_end(&mut buf)?; if self.ignore_cert_check {
let cert = reqwest::Certificate::from_pem(&buf)?; builder = builder.danger_accept_invalid_certs(self.ignore_cert_check);
}
if !self.ssl_cert_file.is_empty() {
let mut buf = Vec::new();
File::open(self.ssl_cert_file.to_string())?.read_to_end(&mut buf)?;
let cert = reqwest::Certificate::from_pem(&buf)?;
builder = builder.add_root_certificate(cert);
}
let client = reqwest::Client::builder() let client = builder.build()?;
.no_gzip()
.add_root_certificate(cert)
.danger_accept_invalid_certs(self.ignore_cert_check)
.build()?;
let mut req = client.request(method.clone(), url.to_string()); let mut req = client.request(method.clone(), url.to_string());
@ -925,8 +928,95 @@ impl<'a> Client<'a> {
}) })
} }
// ListenBucketNotificationResponse ListenBucketNotification( pub async fn listen_bucket_notification(
// ListenBucketNotificationArgs args); &self,
args: &ListenBucketNotificationArgs<'_>,
) -> Result<ListenBucketNotificationResponse, Error> {
if self.base_url.aws_host {
return Err(Error::UnsupportedApi(String::from(
"ListenBucketNotification",
)));
}
let region = self.get_region(&args.bucket, args.region).await?;
let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
if let Some(v) = args.prefix {
query_params.insert(String::from("prefix"), v.to_string());
}
if let Some(v) = args.suffix {
query_params.insert(String::from("suffix"), v.to_string());
}
if let Some(v) = &args.events {
for e in v.iter() {
query_params.insert(String::from("events"), e.to_string());
}
} else {
query_params.insert(String::from("events"), String::from("s3:ObjectCreated:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectRemoved:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectAccessed:*"));
}
let mut resp = self
.execute(
Method::GET,
&region,
&mut headers,
&query_params,
Some(&args.bucket),
None,
None,
)
.await?;
let header_map = resp.headers().clone();
let mut done = false;
let mut buf = VecDeque::<u8>::new();
while !done {
let chunk = match resp.chunk().await? {
Some(v) => v,
None => {
done = true;
Bytes::new()
}
};
buf.extend(chunk.iter().copied());
while !done {
match buf.iter().position(|&v| v == '\n' as u8) {
Some(i) => {
let mut data = vec![0_u8; i + 1];
for j in 0..=i {
data[j] = buf.pop_front().ok_or(Error::InsufficientData(i, j))?;
}
let mut line = String::from_utf8(data)?;
line = line.trim().to_string();
if !line.is_empty() {
let records: NotificationRecords = serde_json::from_str(&line)?;
done = !(args.event_fn)(records);
}
}
None => break,
};
}
}
Ok(ListenBucketNotificationResponse::new(
header_map,
&region,
&args.bucket,
))
}
pub async fn list_objects_v1( pub async fn list_objects_v1(
&self, &self,
args: &ListObjectsV1Args<'_>, args: &ListObjectsV1Args<'_>,

View File

@ -60,6 +60,7 @@ pub enum Error {
IntError(std::num::ParseIntError), IntError(std::num::ParseIntError),
BoolError(std::str::ParseBoolError), BoolError(std::str::ParseBoolError),
Utf8Error(alloc::string::FromUtf8Error), Utf8Error(alloc::string::FromUtf8Error),
JsonError(serde_json::Error),
XmlError(String), XmlError(String),
InvalidBucketName(String), InvalidBucketName(String),
InvalidBaseUrl(String), InvalidBaseUrl(String),
@ -86,6 +87,7 @@ pub enum Error {
CrcMismatch(String, u32, u32), CrcMismatch(String, u32, u32),
UnknownEventType(String), UnknownEventType(String),
SelectError(String, String), SelectError(String, String),
UnsupportedApi(String),
} }
impl std::error::Error for Error {} impl std::error::Error for Error {}
@ -102,6 +104,7 @@ impl fmt::Display for Error {
Error::IntError(e) => write!(f, "{}", e), Error::IntError(e) => write!(f, "{}", e),
Error::BoolError(e) => write!(f, "{}", e), Error::BoolError(e) => write!(f, "{}", e),
Error::Utf8Error(e) => write!(f, "{}", e), Error::Utf8Error(e) => write!(f, "{}", e),
Error::JsonError(e) => write!(f, "{}", e),
Error::XmlError(m) => write!(f, "{}", m), Error::XmlError(m) => write!(f, "{}", m),
Error::InvalidBucketName(m) => write!(f, "{}", m), Error::InvalidBucketName(m) => write!(f, "{}", m),
Error::InvalidObjectName(m) => write!(f, "{}", m), Error::InvalidObjectName(m) => write!(f, "{}", m),
@ -128,6 +131,7 @@ impl fmt::Display for Error {
Error::CrcMismatch(t, e, g) => write!(f, "{} CRC mismatch; expected: {}, got: {}", t, e, g), Error::CrcMismatch(t, e, g) => write!(f, "{} CRC mismatch; expected: {}, got: {}", t, e, g),
Error::UnknownEventType(et) => write!(f, "unknown event type {}", et), Error::UnknownEventType(et) => write!(f, "unknown event type {}", et),
Error::SelectError(ec, em) => write!(f, "error code: {}, error message: {}", ec, em), Error::SelectError(ec, em) => write!(f, "error code: {}, error message: {}", ec, em),
Error::UnsupportedApi(a) => write!(f, "{} API is not supported in Amazon AWS S3", a),
} }
} }
} }
@ -185,3 +189,9 @@ impl From<alloc::string::FromUtf8Error> for Error {
Error::Utf8Error(err) Error::Utf8Error(err)
} }
} }
impl From<serde_json::Error> for Error {
fn from(err: serde_json::Error) -> Self {
Error::JsonError(err)
}
}

View File

@ -96,7 +96,7 @@ pub struct BaseUrl {
host: String, host: String,
port: u16, port: u16,
pub region: String, pub region: String,
aws_host: bool, pub aws_host: bool,
accelerate_host: bool, accelerate_host: bool,
dualstack_host: bool, dualstack_host: bool,
virtual_style: bool, virtual_style: bool,

View File

@ -594,3 +594,23 @@ impl SelectObjectContentResponse {
} }
} }
} }
pub struct ListenBucketNotificationResponse {
pub headers: HeaderMap,
pub region: String,
pub bucket_name: String,
}
impl ListenBucketNotificationResponse {
pub fn new(
headers: HeaderMap,
region: &str,
bucket_name: &str,
) -> ListenBucketNotificationResponse {
ListenBucketNotificationResponse {
headers: headers,
region: region.to_string(),
bucket_name: bucket_name.to_string(),
}
}
}

View File

@ -15,6 +15,7 @@
use crate::s3::error::Error; use crate::s3::error::Error;
use crate::s3::utils::UtcTime; use crate::s3::utils::UtcTime;
use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
@ -457,3 +458,111 @@ pub struct SelectProgress {
pub bytes_progressed: usize, pub bytes_progressed: usize,
pub bytes_returned: usize, pub bytes_returned: usize,
} }
#[derive(Debug, Deserialize, Serialize)]
pub struct UserIdentity {
#[serde(alias = "principalId")]
pub principal_id: Option<String>,
}
pub type OwnerIdentity = UserIdentity;
#[derive(Debug, Deserialize, Serialize)]
pub struct RequestParameters {
#[serde(alias = "principalId")]
pub principal_id: Option<String>,
#[serde(alias = "region")]
pub region: Option<String>,
#[serde(alias = "sourceIPAddress")]
pub source_ip_address: Option<String>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct ResponseElements {
#[serde(alias = "content-length")]
pub content_length: Option<String>,
#[serde(alias = "x-amz-request-id")]
pub x_amz_request_id: Option<String>,
#[serde(alias = "x-minio-deployment-id")]
pub x_minio_deployment_id: Option<String>,
#[serde(alias = "x-minio-origin-endpoint")]
pub x_minio_origin_endpoint: Option<String>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct S3Bucket {
#[serde(alias = "name")]
pub name: Option<String>,
#[serde(alias = "arn")]
pub arn: Option<String>,
#[serde(alias = "ownerIdentity")]
pub owner_identity: Option<OwnerIdentity>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct S3Object {
#[serde(alias = "key")]
pub key: Option<String>,
#[serde(alias = "size")]
pub size: Option<usize>,
#[serde(alias = "eTag")]
pub etag: Option<String>,
#[serde(alias = "contentType")]
pub content_type: Option<String>,
#[serde(alias = "userMetadata")]
pub user_metadata: Option<HashMap<String, String>>,
#[serde(alias = "sequencer")]
pub sequencer: Option<String>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct S3 {
#[serde(alias = "s3SchemaVersion")]
pub s3_schema_version: Option<String>,
#[serde(alias = "configurationId")]
pub configuration_id: Option<String>,
#[serde(alias = "bucket")]
pub bucket: Option<S3Bucket>,
#[serde(alias = "object")]
pub object: Option<S3Object>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Source {
#[serde(alias = "host")]
pub host: Option<String>,
#[serde(alias = "port")]
pub port: Option<String>,
#[serde(alias = "userAgent")]
pub user_agent: Option<String>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct NotificationRecord {
#[serde(alias = "eventVersion")]
pub event_version: Option<String>,
#[serde(alias = "eventSource")]
pub event_source: Option<String>,
#[serde(alias = "awsRegion")]
pub aws_region: Option<String>,
#[serde(alias = "eventTime")]
pub event_time: Option<String>,
#[serde(alias = "eventName")]
pub event_name: Option<String>,
#[serde(alias = "userIdentity")]
pub user_identity: Option<UserIdentity>,
#[serde(alias = "requestParameters")]
pub request_parameters: Option<RequestParameters>,
#[serde(alias = "responseElements")]
pub response_elements: Option<ResponseElements>,
#[serde(alias = "s3")]
pub s3: Option<S3>,
#[serde(alias = "source")]
pub source: Option<Source>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct NotificationRecords {
#[serde(alias = "Records")]
pub records: Vec<NotificationRecord>,
}

View File

@ -13,8 +13,11 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use async_std::task;
use minio::s3::types::NotificationRecords;
use rand::distributions::{Alphanumeric, DistString}; use rand::distributions::{Alphanumeric, DistString};
use std::io::BufReader; use std::io::BufReader;
use tokio::sync::mpsc;
use minio::s3::args::*; use minio::s3::args::*;
use minio::s3::client::Client; use minio::s3::client::Client;
@ -64,18 +67,53 @@ fn rand_object_name() -> String {
} }
struct ClientTest<'a> { struct ClientTest<'a> {
client: &'a Client<'a>, base_url: BaseUrl,
access_key: String,
secret_key: String,
ignore_cert_check: bool,
ssl_cert_file: String,
client: Client<'a>,
test_bucket: String, test_bucket: String,
} }
impl<'a> ClientTest<'a> { impl<'a> ClientTest<'_> {
fn new(client: &'a Client<'_>, test_bucket: &'a str) -> ClientTest<'a> { fn new(
base_url: BaseUrl,
access_key: String,
secret_key: String,
static_provider: &'a StaticProvider,
ignore_cert_check: bool,
ssl_cert_file: String,
) -> ClientTest<'a> {
let mut client = Client::new(base_url.clone(), Some(static_provider));
client.ignore_cert_check = ignore_cert_check;
client.ssl_cert_file = ssl_cert_file.to_string();
ClientTest { ClientTest {
base_url: base_url,
access_key: access_key,
secret_key: secret_key,
ignore_cert_check: ignore_cert_check,
ssl_cert_file: ssl_cert_file,
client: client, client: client,
test_bucket: test_bucket.to_string(), test_bucket: rand_bucket_name(),
} }
} }
async fn init(&self) {
self.client
.make_bucket(&MakeBucketArgs::new(&self.test_bucket).unwrap())
.await
.unwrap();
}
async fn drop(&self) {
self.client
.remove_bucket(&RemoveBucketArgs::new(&self.test_bucket).unwrap())
.await
.unwrap();
}
async fn bucket_exists(&self) { async fn bucket_exists(&self) {
let bucket_name = rand_bucket_name(); let bucket_name = rand_bucket_name();
self.client self.client
@ -392,6 +430,74 @@ impl<'a> ClientTest<'a> {
.await .await
.unwrap(); .unwrap();
} }
async fn listen_bucket_notification(&self) {
let object_name = rand_object_name();
let name = object_name.clone();
let (sender, mut receiver): (mpsc::UnboundedSender<bool>, mpsc::UnboundedReceiver<bool>) =
mpsc::unbounded_channel();
let access_key = self.access_key.clone();
let secret_key = self.secret_key.clone();
let base_url = self.base_url.clone();
let ignore_cert_check = self.ignore_cert_check;
let ssl_cert_file = self.ssl_cert_file.clone();
let test_bucket = self.test_bucket.clone();
let listen_task = move || async move {
let static_provider = StaticProvider::new(&access_key, &secret_key, None);
let mut client = Client::new(base_url, Some(&static_provider));
client.ignore_cert_check = ignore_cert_check;
client.ssl_cert_file = ssl_cert_file;
let event_fn = |event: NotificationRecords| {
for record in event.records.iter() {
if let Some(s3) = &record.s3 {
if let Some(object) = &s3.object {
if let Some(key) = &object.key {
if name == *key {
sender.send(true).unwrap();
}
return false;
}
}
}
}
sender.send(false).unwrap();
return false;
};
let args = &ListenBucketNotificationArgs::new(&test_bucket, &event_fn).unwrap();
client.listen_bucket_notification(&args).await.unwrap();
};
let spawned_task = task::spawn(listen_task());
task::sleep(std::time::Duration::from_millis(100)).await;
let size = 16_usize;
self.client
.put_object(
&mut PutObjectArgs::new(
&self.test_bucket,
&object_name,
&mut RandReader::new(size),
Some(size),
None,
)
.unwrap(),
)
.await
.unwrap();
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.await
.unwrap();
spawned_task.await;
assert_eq!(receiver.recv().await.unwrap(), true);
}
} }
#[tokio::main] #[tokio::main]
@ -405,24 +511,22 @@ async fn s3_tests() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let ignore_cert_check = std::env::var("IGNORE_CERT_CHECK").is_ok(); let ignore_cert_check = std::env::var("IGNORE_CERT_CHECK").is_ok();
let region = std::env::var("SERVER_REGION").ok(); let region = std::env::var("SERVER_REGION").ok();
let mut burl = BaseUrl::from_string(host).unwrap(); let mut base_url = BaseUrl::from_string(host).unwrap();
burl.https = secure; base_url.https = secure;
if let Some(v) = region { if let Some(v) = region {
burl.region = v; base_url.region = v;
} }
let provider = StaticProvider::new(&access_key, &secret_key, None); let static_provider = StaticProvider::new(&access_key, &secret_key, None);
let mut client = Client::new(burl.clone(), Some(&provider)); let ctest = ClientTest::new(
client.ignore_cert_check = ignore_cert_check; base_url,
client.ssl_cert_file = ssl_cert_file; access_key,
secret_key,
let test_bucket = rand_bucket_name(); &static_provider,
client ignore_cert_check,
.make_bucket(&MakeBucketArgs::new(&test_bucket).unwrap()) ssl_cert_file,
.await );
.unwrap(); ctest.init().await;
let ctest = ClientTest::new(&client, &test_bucket);
println!("make_bucket() + bucket_exists() + remove_bucket()"); println!("make_bucket() + bucket_exists() + remove_bucket()");
ctest.bucket_exists().await; ctest.bucket_exists().await;
@ -448,10 +552,10 @@ async fn s3_tests() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("select_object_content()"); println!("select_object_content()");
ctest.select_object_content().await; ctest.select_object_content().await;
client println!("listen_bucket_notification()");
.remove_bucket(&RemoveBucketArgs::new(&test_bucket).unwrap()) ctest.listen_bucket_notification().await;
.await
.unwrap(); ctest.drop().await;
Ok(()) Ok(())
} }