Add builder for ListenBucketNotification (#75)

- Also update the types used in NotificationRecords
This commit is contained in:
Aditya Manthramurthy 2024-04-02 17:55:38 -07:00 committed by GitHub
parent fc20535f1d
commit 6a34d4c677
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 392 additions and 257 deletions

View File

@ -1054,43 +1054,6 @@ impl<'a> SelectObjectContentArgs<'a> {
}
}
/// Argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API
#[derive(Clone, Debug)]
pub struct ListenBucketNotificationArgs {
pub extra_headers: Option<Multimap>,
pub extra_query_params: Option<Multimap>,
pub region: Option<String>,
pub bucket: String,
pub prefix: Option<String>,
pub suffix: Option<String>,
pub events: Option<Vec<String>>,
}
impl ListenBucketNotificationArgs {
/// Returns argument for [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API with given bucket name and callback function for results.
///
/// # Examples
///
/// ```
/// use minio::s3::args::*;
/// use minio::s3::types::NotificationRecords;
///
/// let args = ListenBucketNotificationArgs::new("my-bucket").unwrap();
/// ```
pub fn new(bucket_name: &str) -> Result<ListenBucketNotificationArgs, Error> {
check_bucket_name(bucket_name, true)?;
Ok(ListenBucketNotificationArgs {
extra_headers: None,
extra_query_params: None,
region: None,
bucket: bucket_name.to_owned(),
prefix: None,
suffix: None,
events: None,
})
}
}
#[derive(Clone, Debug, Default)]
/// Argument for [upload_part_copy()](crate::s3::client::Client::upload_part_copy) S3 API
pub struct UploadPartCopyArgs<'a> {

View File

@ -13,5 +13,7 @@
//! Argument builders for [minio::s3::client::Client](crate::s3::client::Client) APIs
mod list_objects;
mod listen_bucket_notification;
pub use list_objects::*;
pub use listen_bucket_notification::*;

View File

@ -0,0 +1,139 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2023 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use futures_util::Stream;
use http::Method;
use crate::s3::{
client::Client,
error::Error,
response::ListenBucketNotificationResponse,
types::{NotificationRecords, S3Api, S3Request, ToS3Request},
utils::{check_bucket_name, merge, Multimap},
};
/// Argument builder for
/// [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification)
/// API.
#[derive(Clone, Debug, Default)]
pub struct ListenBucketNotification {
client: Option<Client>,
extra_headers: Option<Multimap>,
extra_query_params: Option<Multimap>,
region: Option<String>,
bucket: String,
prefix: Option<String>,
suffix: Option<String>,
events: Option<Vec<String>>,
}
#[async_trait]
impl S3Api for ListenBucketNotification {
type S3Response = (
ListenBucketNotificationResponse,
Box<dyn Stream<Item = Result<NotificationRecords, Error>> + Unpin + Send>,
);
}
impl ToS3Request for ListenBucketNotification {
fn to_s3request(&self) -> Result<S3Request, Error> {
let client = self.client.as_ref().ok_or(Error::NoClientProvided)?;
if client.is_aws_host() {
return Err(Error::UnsupportedApi(String::from(
"ListenBucketNotification",
)));
}
check_bucket_name(&self.bucket, true)?;
let mut headers = Multimap::new();
if let Some(v) = &self.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &self.extra_query_params {
merge(&mut query_params, v);
}
if let Some(v) = &self.prefix {
query_params.insert(String::from("prefix"), v.to_string());
}
if let Some(v) = &self.suffix {
query_params.insert(String::from("suffix"), v.to_string());
}
if let Some(v) = &self.events {
for e in v.iter() {
query_params.insert(String::from("events"), e.to_string());
}
} else {
query_params.insert(String::from("events"), String::from("s3:ObjectCreated:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectRemoved:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectAccessed:*"));
}
let req = S3Request::new(client, Method::GET)
.region(self.region.as_deref())
.bucket(Some(&self.bucket))
.query_params(query_params)
.headers(headers);
Ok(req)
}
}
impl ListenBucketNotification {
pub fn new(bucket_name: &str) -> ListenBucketNotification {
ListenBucketNotification {
bucket: bucket_name.to_owned(),
..Default::default()
}
}
pub fn client(mut self, client: &Client) -> Self {
self.client = Some(client.clone());
self
}
pub fn extra_headers(mut self, extra_headers: Option<Multimap>) -> Self {
self.extra_headers = extra_headers;
self
}
pub fn extra_query_params(mut self, extra_query_params: Option<Multimap>) -> Self {
self.extra_query_params = extra_query_params;
self
}
pub fn region(mut self, region: Option<String>) -> Self {
self.region = region;
self
}
pub fn prefix(mut self, prefix: Option<String>) -> Self {
self.prefix = prefix;
self
}
pub fn suffix(mut self, suffix: Option<String>) -> Self {
self.suffix = suffix;
self
}
pub fn events(mut self, events: Option<Vec<String>>) -> Self {
self.events = events;
self
}
}

View File

@ -174,6 +174,10 @@ impl Client {
.build()
}
pub fn is_aws_host(&self) -> bool {
self.base_url.is_aws_host()
}
fn build_headers(
&self,
headers: &mut Multimap,

View File

@ -15,19 +15,7 @@
//! MinIO Extension API for S3 Buckets: ListenBucketNotification
use futures_util::stream;
use http::Method;
use tokio::io::AsyncBufReadExt;
use tokio_stream::{Stream, StreamExt};
use tokio_util::io::StreamReader;
use crate::s3::{
args::ListenBucketNotificationArgs,
error::Error,
response::ListenBucketNotificationResponse,
types::NotificationRecords,
utils::{merge, Multimap},
};
use crate::s3::builders::ListenBucketNotification;
use super::Client;
@ -38,97 +26,7 @@ impl Client {
/// returned by the server and the latter is a stream of notification
/// records. In normal operation (when there are no errors), the stream
/// never ends.
pub async fn listen_bucket_notification(
&self,
args: ListenBucketNotificationArgs,
) -> Result<
(
ListenBucketNotificationResponse,
impl Stream<Item = Result<NotificationRecords, Error>>,
),
Error,
> {
if self.base_url.is_aws_host() {
return Err(Error::UnsupportedApi(String::from(
"ListenBucketNotification",
)));
}
let region = self
.get_region(&args.bucket, args.region.as_deref())
.await?;
let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
merge(&mut headers, v);
}
let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params {
merge(&mut query_params, v);
}
if let Some(v) = args.prefix {
query_params.insert(String::from("prefix"), v.to_string());
}
if let Some(v) = args.suffix {
query_params.insert(String::from("suffix"), v.to_string());
}
if let Some(v) = &args.events {
for e in v.iter() {
query_params.insert(String::from("events"), e.to_string());
}
} else {
query_params.insert(String::from("events"), String::from("s3:ObjectCreated:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectRemoved:*"));
query_params.insert(String::from("events"), String::from("s3:ObjectAccessed:*"));
}
let resp = self
.execute(
Method::GET,
&region,
&mut headers,
&query_params,
Some(&args.bucket),
None,
None,
)
.await?;
let header_map = resp.headers().clone();
let body_stream = resp.bytes_stream();
let body_stream = body_stream
.map(|r| r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)));
let stream_reader = StreamReader::new(body_stream);
let record_stream = Box::pin(stream::unfold(
stream_reader,
move |mut reader| async move {
loop {
let mut line = String::new();
match reader.read_line(&mut line).await {
Ok(n) => {
if n == 0 {
return None;
}
let s = line.trim();
if s.is_empty() {
continue;
}
let records_res: Result<NotificationRecords, Error> =
serde_json::from_str(s).map_err(|e| e.into());
return Some((records_res, reader));
}
Err(e) => return Some((Err(e.into()), reader)),
}
}
},
));
Ok((
ListenBucketNotificationResponse::new(header_map, &region, &args.bucket),
record_stream,
))
pub fn listen_bucket_notification(&self, bucket: &str) -> ListenBucketNotification {
ListenBucketNotification::new(bucket).client(self)
}
}

View File

@ -32,10 +32,12 @@ use crate::s3::utils::{
};
mod list_objects;
mod listen_bucket_notification;
pub use list_objects::{
ListObjectVersionsResponse, ListObjectsResponse, ListObjectsV1Response, ListObjectsV2Response,
};
pub use listen_bucket_notification::ListenBucketNotificationResponse;
#[derive(Debug)]
/// Response of [list_buckets()](crate::s3::client::Client::list_buckets) API
@ -566,28 +568,6 @@ impl SelectObjectContentResponse {
}
}
#[derive(Clone, Debug)]
/// Response of [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification) API
pub struct ListenBucketNotificationResponse {
pub headers: HeaderMap,
pub region: String,
pub bucket_name: String,
}
impl ListenBucketNotificationResponse {
pub fn new(
headers: HeaderMap,
region: &str,
bucket_name: &str,
) -> ListenBucketNotificationResponse {
ListenBucketNotificationResponse {
headers,
region: region.to_string(),
bucket_name: bucket_name.to_string(),
}
}
}
/// Response of [delete_bucket_encryption()](crate::s3::client::Client::delete_bucket_encryption) API
pub type DeleteBucketEncryptionResponse = BucketResponse;

View File

@ -0,0 +1,87 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2023 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use futures_util::{stream, Stream, StreamExt};
use http::HeaderMap;
use tokio::io::AsyncBufReadExt;
use tokio_util::io::StreamReader;
use crate::s3::{
error::Error,
types::{FromS3Response, NotificationRecords, S3Request},
};
/// Response of
/// [listen_bucket_notification()](crate::s3::client::Client::listen_bucket_notification)
/// API
#[derive(Debug)]
pub struct ListenBucketNotificationResponse {
pub headers: HeaderMap,
pub region: String,
pub bucket: String,
}
#[async_trait::async_trait]
impl FromS3Response
for (
ListenBucketNotificationResponse,
Box<dyn Stream<Item = Result<NotificationRecords, Error>> + Unpin + Send>,
)
{
async fn from_s3response<'a>(
req: S3Request<'a>,
resp: reqwest::Response,
) -> Result<Self, Error> {
let headers = resp.headers().clone();
let body_stream = resp.bytes_stream();
let body_stream = body_stream
.map(|r| r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)));
let stream_reader = StreamReader::new(body_stream);
let record_stream = Box::pin(stream::unfold(
stream_reader,
move |mut reader| async move {
loop {
let mut line = String::new();
match reader.read_line(&mut line).await {
Ok(n) => {
if n == 0 {
return None;
}
let s = line.trim();
if s.is_empty() {
continue;
}
let records_res: Result<NotificationRecords, Error> =
serde_json::from_str(&s).map_err(|e| e.into());
return Some((records_res, reader));
}
Err(e) => return Some((Err(e.into()), reader)),
}
}
},
));
Ok((
ListenBucketNotificationResponse {
headers,
region: req.get_computed_region(),
bucket: req.bucket.unwrap().to_string(),
},
Box::new(record_stream),
))
}
}

View File

@ -33,13 +33,13 @@ use std::fmt;
pub struct S3Request<'a> {
client: &'a Client,
method: Method,
region: Option<&'a str>,
bucket: Option<&'a str>,
object: Option<&'a str>,
query_params: Multimap,
headers: Multimap,
body: Option<Vec<u8>>,
pub method: Method,
pub region: Option<&'a str>,
pub bucket: Option<&'a str>,
pub object: Option<&'a str>,
pub query_params: Multimap,
pub headers: Multimap,
pub body: Option<Vec<u8>>,
// Computed region
inner_region: String,
@ -90,6 +90,10 @@ impl<'a> S3Request<'a> {
self
}
pub fn get_computed_region(&self) -> String {
self.inner_region.clone()
}
pub async fn execute(&mut self) -> Result<reqwest::Response, Error> {
// Lookup the region of the bucket if provided.
self.inner_region = if let Some(bucket) = self.bucket {
@ -592,126 +596,165 @@ impl<'a> SelectRequest<'a> {
}
}
#[derive(Clone, Debug)]
/// Progress information of [select_object_content()](crate::s3::client::Client::select_object_content) API
#[derive(Clone, Debug)]
pub struct SelectProgress {
pub bytes_scanned: usize,
pub bytes_progressed: usize,
pub bytes_returned: usize,
}
#[derive(Debug, Deserialize, Serialize)]
/// User identity contains principal ID
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
pub struct UserIdentity {
#[serde(alias = "principalId")]
pub principal_id: Option<String>,
#[serde(alias = "principalId", default)]
pub principal_id: String,
}
/// Owner identity contains principal ID
pub type OwnerIdentity = UserIdentity;
#[derive(Debug, Deserialize, Serialize)]
/// Request parameters contain principal ID, region and source IP address
pub struct RequestParameters {
#[serde(alias = "principalId")]
pub principal_id: Option<String>,
#[serde(alias = "region")]
pub region: Option<String>,
#[serde(alias = "sourceIPAddress")]
pub source_ip_address: Option<String>,
/// Request parameters contain principal ID, region and source IP address, but
/// they are represented as a string-to-string map in the MinIO server. So we
/// provide methods to fetch the known fields and a map for underlying
/// representation.
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
pub struct RequestParameters(HashMap<String, String>);
impl RequestParameters {
pub fn principal_id(&self) -> Option<&String> {
self.0.get("principalId")
}
pub fn region(&self) -> Option<&String> {
self.0.get("region")
}
pub fn source_ip_address(&self) -> Option<&String> {
self.0.get("sourceIPAddress")
}
pub fn get_map(&self) -> &HashMap<String, String> {
&self.0
}
}
#[derive(Debug, Deserialize, Serialize)]
/// Response elements information
pub struct ResponseElements {
#[serde(alias = "content-length")]
pub content_length: Option<String>,
#[serde(alias = "x-amz-request-id")]
pub x_amz_request_id: Option<String>,
#[serde(alias = "x-minio-deployment-id")]
pub x_minio_deployment_id: Option<String>,
#[serde(alias = "x-minio-origin-endpoint")]
pub x_minio_origin_endpoint: Option<String>,
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
/// Response elements information: they are represented as a string-to-string
/// map in the MinIO server. So we provide methods to fetch the known fields and
/// a map for underlying representation.
pub struct ResponseElements(HashMap<String, String>);
impl ResponseElements {
pub fn content_length(&self) -> Option<&String> {
self.0.get("content-length")
}
pub fn x_amz_request_id(&self) -> Option<&String> {
self.0.get("x-amz-request-id")
}
pub fn x_minio_deployment_id(&self) -> Option<&String> {
self.0.get("x-minio-deployment-id")
}
pub fn x_amz_id_2(&self) -> Option<&String> {
self.0.get("x-amz-id-2")
}
pub fn x_minio_origin_endpoint(&self) -> Option<&String> {
self.0.get("x-minio-origin-endpoint")
}
pub fn get_map(&self) -> &HashMap<String, String> {
&self.0
}
}
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
/// S3 bucket information
pub struct S3Bucket {
#[serde(alias = "name")]
pub name: Option<String>,
#[serde(alias = "arn")]
pub arn: Option<String>,
#[serde(alias = "ownerIdentity")]
pub owner_identity: Option<OwnerIdentity>,
#[serde(alias = "name", default)]
pub name: String,
#[serde(alias = "arn", default)]
pub arn: String,
#[serde(alias = "ownerIdentity", default)]
pub owner_identity: OwnerIdentity,
}
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
/// S3 object information
pub struct S3Object {
#[serde(alias = "key")]
pub key: Option<String>,
#[serde(alias = "key", default)]
pub key: String,
#[serde(alias = "size")]
pub size: Option<usize>,
pub size: Option<u64>,
#[serde(alias = "eTag")]
pub etag: Option<String>,
#[serde(alias = "contentType")]
pub content_type: Option<String>,
#[serde(alias = "userMetadata")]
pub user_metadata: Option<HashMap<String, String>>,
#[serde(alias = "sequencer")]
pub sequencer: Option<String>,
#[serde(alias = "versionId", default)]
pub version_id: String,
#[serde(alias = "sequencer", default)]
pub sequencer: String,
}
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
/// S3 definitions for NotificationRecord
pub struct S3 {
#[serde(alias = "s3SchemaVersion")]
pub s3_schema_version: Option<String>,
#[serde(alias = "configurationId")]
pub configuration_id: Option<String>,
#[serde(alias = "bucket")]
pub bucket: Option<S3Bucket>,
#[serde(alias = "object")]
pub object: Option<S3Object>,
#[serde(alias = "s3SchemaVersion", default)]
pub s3_schema_version: String,
#[serde(alias = "configurationId", default)]
pub configuration_id: String,
#[serde(alias = "bucket", default)]
pub bucket: S3Bucket,
#[serde(alias = "object", default)]
pub object: S3Object,
}
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
/// Source information
pub struct Source {
#[serde(alias = "host")]
pub host: Option<String>,
#[serde(alias = "host", default)]
pub host: String,
#[serde(alias = "port")]
pub port: Option<String>,
#[serde(alias = "userAgent")]
pub user_agent: Option<String>,
#[serde(alias = "userAgent", default)]
pub user_agent: String,
}
#[derive(Debug, Deserialize, Serialize)]
/// Notification record information
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct NotificationRecord {
#[serde(alias = "eventVersion")]
pub event_version: Option<String>,
#[serde(alias = "eventSource")]
pub event_source: Option<String>,
#[serde(alias = "awsRegion")]
pub aws_region: Option<String>,
#[serde(alias = "eventTime")]
pub event_time: Option<String>,
#[serde(alias = "eventName")]
pub event_name: Option<String>,
#[serde(alias = "userIdentity")]
pub user_identity: Option<UserIdentity>,
#[serde(alias = "requestParameters")]
pub request_parameters: Option<RequestParameters>,
#[serde(alias = "responseElements")]
pub response_elements: Option<ResponseElements>,
#[serde(alias = "s3")]
pub s3: Option<S3>,
#[serde(alias = "source")]
pub source: Option<Source>,
#[serde(alias = "eventVersion", default)]
pub event_version: String,
#[serde(alias = "eventSource", default)]
pub event_source: String,
#[serde(alias = "awsRegion", default)]
pub aws_region: String,
#[serde(
alias = "eventTime",
default,
with = "crate::s3::utils::aws_date_format"
)]
pub event_time: UtcTime,
#[serde(alias = "eventName", default)]
pub event_name: String,
#[serde(alias = "userIdentity", default)]
pub user_identity: UserIdentity,
#[serde(alias = "requestParameters", default)]
pub request_parameters: RequestParameters,
#[serde(alias = "responseElements", default)]
pub response_elements: ResponseElements,
#[serde(alias = "s3", default)]
pub s3: S3,
#[serde(alias = "source", default)]
pub source: Source,
}
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, Clone)]
/// Contains notification records
pub struct NotificationRecords {
#[serde(alias = "Records")]

View File

@ -129,6 +129,26 @@ pub fn from_iso8601utc(s: &str) -> Result<UtcTime, ParseError> {
))
}
pub mod aws_date_format {
use super::{from_iso8601utc, to_iso8601utc, UtcTime};
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(date: &UtcTime, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&to_iso8601utc(date.clone()))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<UtcTime, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
Ok(from_iso8601utc(&s).map_err(serde::de::Error::custom)?)
}
}
/// Parses HTTP header value to time
pub fn from_http_header_value(s: &str) -> Result<UtcTime, ParseError> {
Ok(DateTime::<Utc>::from_naive_utc_and_offset(

View File

@ -30,13 +30,13 @@ use minio::s3::args::*;
use minio::s3::client::Client;
use minio::s3::creds::StaticProvider;
use minio::s3::http::BaseUrl;
use minio::s3::types::NotificationRecords;
use minio::s3::types::ToStream;
use minio::s3::types::{
CsvInputSerialization, CsvOutputSerialization, DeleteObject, FileHeaderInfo,
NotificationConfig, ObjectLockConfig, PrefixFilterRule, QueueConfig, QuoteFields,
RetentionMode, SelectRequest, SuffixFilterRule,
};
use minio::s3::types::{NotificationRecords, S3Api};
use minio::s3::utils::{to_iso8601utc, utc_now};
struct RandReader {
@ -554,24 +554,23 @@ impl ClientTest {
.unwrap();
let event_fn = |event: NotificationRecords| {
for record in event.records.iter() {
if let Some(s3) = &record.s3 {
if let Some(object) = &s3.object {
if let Some(key) = &object.key {
if name == *key {
sender.send(true).unwrap();
}
return false;
}
}
let record = event.records.iter().next();
if let Some(record) = record {
let key = &record.s3.object.key;
if name == *key {
sender.send(true).unwrap();
return false;
}
}
sender.send(false).unwrap();
false
};
let args = ListenBucketNotificationArgs::new(&test_bucket).unwrap();
let (_, mut event_stream) = client.listen_bucket_notification(args).await.unwrap();
let (_, mut event_stream) = client
.listen_bucket_notification(&test_bucket)
.send()
.await
.unwrap();
while let Some(event) = event_stream.next().await {
let event = event.unwrap();
if !event_fn(event) {