Add new put and get APIs for objects (#78)

- put_object_content -> streaming object uploads

- put_object_from_file -> upload file

- put_object, create_multipart_uload, abort_multipart_upload,
upload_part, complete_multipart_upload -> S3 APIs for single and
multipart uploads

- get_object -> streaming object downloads
This commit is contained in:
Aditya Manthramurthy 2024-04-02 18:09:54 -07:00 committed by GitHub
parent 3f160cb6c0
commit 54b671ef4c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 2053 additions and 107 deletions

View File

@ -30,7 +30,6 @@ md5 = "0.7.0"
multimap = "0.10.0" multimap = "0.10.0"
os_info = "3.7.0" os_info = "3.7.0"
percent-encoding = "2.3.0" percent-encoding = "2.3.0"
rand = "0.8.5"
regex = "1.9.4" regex = "1.9.4"
serde = { version = "1.0.188", features = ["derive"] } serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.105" serde_json = "1.0.105"
@ -50,6 +49,7 @@ features = ["native-tls", "blocking", "rustls-tls", "stream"]
[dev-dependencies] [dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
rand = { version = "0.8.5", features = ["small_rng"] }
[[example]] [[example]]
name = "file-uploader" name = "file-uploader"

View File

@ -1,6 +1,6 @@
use log::{error, info}; use log::info;
use minio::s3::args::{BucketExistsArgs, MakeBucketArgs, UploadObjectArgs}; use minio::s3::args::{BucketExistsArgs, MakeBucketArgs};
use minio::s3::client::Client; use minio::s3::client::ClientBuilder;
use minio::s3::creds::StaticProvider; use minio::s3::creds::StaticProvider;
use minio::s3::http::BaseUrl; use minio::s3::http::BaseUrl;
use std::path::Path; use std::path::Path;
@ -9,8 +9,7 @@ use std::path::Path;
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
env_logger::init(); // Note: set environment variable RUST_LOG="INFO" to log info and higher env_logger::init(); // Note: set environment variable RUST_LOG="INFO" to log info and higher
//let base_url = "https://play.min.io".parse::<BaseUrl>()?; let base_url = "https://play.min.io".parse::<BaseUrl>()?;
let base_url: BaseUrl = "http://192.168.178.227:9000".parse::<BaseUrl>()?;
info!("Trying to connect to MinIO at: `{:?}`", base_url); info!("Trying to connect to MinIO at: `{:?}`", base_url);
@ -20,13 +19,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
None, None,
); );
let client = Client::new( let client = ClientBuilder::new(base_url.clone())
base_url.clone(), .provider(Some(Box::new(static_provider)))
Some(Box::new(static_provider)), .build()?;
None,
None,
)
.unwrap();
let bucket_name: &str = "asiatrip"; let bucket_name: &str = "asiatrip";
@ -45,28 +40,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
} }
// File we are going to upload to the bucket // File we are going to upload to the bucket
let filename: &Path = Path::new("/home/user/Photos/asiaphotos.zip"); let filename: &Path = Path::new("/tmp/asiaphotos.zip");
// Name of the object that will be stored in the bucket // Name of the object that will be stored in the bucket
let object_name: &str = "asiaphotos-2015.zip"; let object_name: &str = "asiaphotos-2015.zip";
info!("filename {}", &filename.to_str().unwrap()); info!("filename {}", &filename.to_str().unwrap());
// Check if the file exists
let file_exists: bool = filename.exists();
if !file_exists {
error!("File `{}` does not exist!", filename.display());
()
}
// Upload 'filename' as 'object_name' to bucket 'bucket_name'.
client client
.upload_object( .put_object_from_file(bucket_name, object_name, filename)
&mut UploadObjectArgs::new(&bucket_name, &object_name, &filename.to_str().unwrap()) .send()
.unwrap(), .await?;
)
.await
.unwrap();
info!( info!(
"file `{}` is successfully uploaded as object `{object_name}` to bucket `{bucket_name}`.", "file `{}` is successfully uploaded as object `{object_name}` to bucket `{bucket_name}`.",

View File

@ -105,17 +105,17 @@ fn calc_part_info(
) -> Result<(usize, i16), Error> { ) -> Result<(usize, i16), Error> {
if let Some(v) = part_size { if let Some(v) = part_size {
if v < MIN_PART_SIZE { if v < MIN_PART_SIZE {
return Err(Error::InvalidMinPartSize(v)); return Err(Error::InvalidMinPartSize(v as u64));
} }
if v > MAX_PART_SIZE { if v > MAX_PART_SIZE {
return Err(Error::InvalidMaxPartSize(v)); return Err(Error::InvalidMaxPartSize(v as u64));
} }
} }
if let Some(v) = object_size { if let Some(v) = object_size {
if v > MAX_OBJECT_SIZE { if v > MAX_OBJECT_SIZE {
return Err(Error::InvalidObjectSize(v)); return Err(Error::InvalidObjectSize(v as u64));
} }
} else { } else {
if part_size.is_none() { if part_size.is_none() {
@ -142,8 +142,8 @@ fn calc_part_info(
if part_count as u16 > MAX_MULTIPART_COUNT { if part_count as u16 > MAX_MULTIPART_COUNT {
return Err(Error::InvalidPartCount( return Err(Error::InvalidPartCount(
object_size.unwrap(), object_size.unwrap() as u64,
psize, psize as u64,
MAX_MULTIPART_COUNT, MAX_MULTIPART_COUNT,
)); ));
} }

View File

@ -13,9 +13,15 @@
//! Argument builders for [minio::s3::client::Client](crate::s3::client::Client) APIs //! Argument builders for [minio::s3::client::Client](crate::s3::client::Client) APIs
mod buckets; mod buckets;
mod get_object;
mod list_objects; mod list_objects;
mod listen_bucket_notification; mod listen_bucket_notification;
mod object_content;
mod put_object;
pub use buckets::*; pub use buckets::*;
pub use get_object::*;
pub use list_objects::*; pub use list_objects::*;
pub use listen_bucket_notification::*; pub use listen_bucket_notification::*;
pub use object_content::*;
pub use put_object::*;

View File

@ -0,0 +1,220 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2023 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use http::Method;
use crate::s3::{
client::Client,
error::Error,
response::GetObjectResponse2,
sse::{Sse, SseCustomerKey},
types::{S3Api, S3Request, ToS3Request},
utils::{check_bucket_name, merge, to_http_header_value, Multimap, UtcTime},
};
#[derive(Debug, Clone, Default)]
pub struct GetObject {
client: Option<Client>,
extra_headers: Option<Multimap>,
extra_query_params: Option<Multimap>,
bucket: String,
object: String,
version_id: Option<String>,
offset: Option<u64>,
length: Option<u64>,
region: Option<String>,
ssec: Option<SseCustomerKey>,
// Conditionals
match_etag: Option<String>,
not_match_etag: Option<String>,
modified_since: Option<UtcTime>,
unmodified_since: Option<UtcTime>,
}
// builder interface
impl GetObject {
pub fn new(bucket: &str, object: &str) -> Self {
Self {
bucket: bucket.to_string(),
object: object.to_string(),
..Default::default()
}
}
pub fn client(mut self, client: &Client) -> Self {
self.client = Some(client.clone());
self
}
pub fn extra_headers(mut self, extra_headers: Option<Multimap>) -> Self {
self.extra_headers = extra_headers;
self
}
pub fn extra_query_params(mut self, extra_query_params: Option<Multimap>) -> Self {
self.extra_query_params = extra_query_params;
self
}
pub fn version_id(mut self, version_id: Option<String>) -> Self {
self.version_id = version_id;
self
}
pub fn offset(mut self, offset: Option<u64>) -> Self {
self.offset = offset;
self
}
pub fn length(mut self, length: Option<u64>) -> Self {
self.length = length;
self
}
pub fn region(mut self, region: Option<String>) -> Self {
self.region = region;
self
}
pub fn ssec(mut self, ssec: Option<SseCustomerKey>) -> Self {
self.ssec = ssec;
self
}
pub fn match_etag(mut self, etag: Option<String>) -> Self {
self.match_etag = etag;
self
}
pub fn not_match_etag(mut self, etag: Option<String>) -> Self {
self.not_match_etag = etag;
self
}
pub fn modified_since(mut self, time: Option<UtcTime>) -> Self {
self.modified_since = time;
self
}
pub fn unmodified_since(mut self, time: Option<UtcTime>) -> Self {
self.unmodified_since = time;
self
}
}
// internal helpers
impl GetObject {
fn get_range_header_value(&self) -> Option<String> {
let (offset, length) = match self.length {
Some(_) => (Some(self.offset.unwrap_or(0_u64)), self.length),
None => (self.offset, None),
};
if let Some(o) = offset {
let mut range = String::new();
range.push_str("bytes=");
range.push_str(&o.to_string());
range.push('-');
if let Some(l) = length {
range.push_str(&(o + l - 1).to_string());
}
Some(range)
} else {
None
}
}
fn get_headers(&self) -> Multimap {
let mut headers = Multimap::new();
if let Some(val) = self.get_range_header_value() {
headers.insert(String::from("Range"), val);
}
if let Some(v) = &self.match_etag {
headers.insert(String::from("if-match"), v.to_string());
}
if let Some(v) = &self.not_match_etag {
headers.insert(String::from("if-none-match"), v.to_string());
}
if let Some(v) = self.modified_since {
headers.insert(String::from("if-modified-since"), to_http_header_value(v));
}
if let Some(v) = self.unmodified_since {
headers.insert(String::from("if-unmodified-since"), to_http_header_value(v));
}
if let Some(v) = &self.ssec {
merge(&mut headers, &v.headers());
}
headers
}
}
impl ToS3Request for GetObject {
fn to_s3request(&self) -> Result<S3Request, Error> {
check_bucket_name(&self.bucket, true)?;
if self.object.is_empty() {
return Err(Error::InvalidObjectName(String::from(
"object name cannot be empty",
)));
}
let client = self.client.clone().ok_or(Error::NoClientProvided)?;
if let Some(_) = &self.ssec {
if !client.is_secure() {
return Err(Error::SseTlsRequired(None));
}
}
let mut headers = Multimap::new();
if let Some(v) = &self.extra_headers {
merge(&mut headers, v);
}
merge(&mut headers, &self.get_headers());
let mut query_params = Multimap::new();
if let Some(v) = &self.extra_query_params {
merge(&mut query_params, v);
}
if let Some(v) = &self.version_id {
query_params.insert(String::from("versionId"), v.to_string());
}
let req = S3Request::new(
self.client.as_ref().ok_or(Error::NoClientProvided)?,
Method::GET,
)
.region(self.region.as_deref())
.bucket(Some(&self.bucket))
.object(Some(&self.object))
.query_params(query_params)
.headers(headers);
Ok(req)
}
}
impl S3Api for GetObject {
type S3Response = GetObjectResponse2;
}

View File

@ -0,0 +1,242 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2023 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::pin::Pin;
use bytes::{Bytes, BytesMut};
use futures_util::Stream;
use tokio::io::AsyncRead;
use tokio_stream::StreamExt;
type IoResult<T> = Result<T, std::io::Error>;
pub struct ObjectContent {
r: Pin<Box<dyn Stream<Item = IoResult<Bytes>>>>,
extra: Option<Bytes>,
size: Option<u64>,
}
impl From<Bytes> for ObjectContent {
fn from(value: Bytes) -> Self {
let n = value.len();
ObjectContent {
r: Box::pin(tokio_stream::iter(vec![Ok(value)])),
extra: None,
size: Some(n as u64),
}
}
}
impl From<String> for ObjectContent {
fn from(value: String) -> Self {
let n = value.len();
ObjectContent {
r: Box::pin(tokio_stream::iter(vec![Ok(Bytes::from(value))])),
extra: None,
size: Some(n as u64),
}
}
}
impl From<Vec<u8>> for ObjectContent {
fn from(value: Vec<u8>) -> Self {
let n = value.len();
ObjectContent {
r: Box::pin(tokio_stream::iter(vec![Ok(Bytes::from(value))])),
extra: None,
size: Some(n as u64),
}
}
}
impl ObjectContent {
pub fn new(r: impl Stream<Item = IoResult<Bytes>> + 'static, size: Option<u64>) -> Self {
let r = Box::pin(r);
Self {
r,
extra: None,
size,
}
}
pub fn empty() -> Self {
Self {
r: Box::pin(tokio_stream::iter(vec![])),
extra: None,
size: Some(0),
}
}
pub fn from_reader(r: impl AsyncRead + Send + Sync + 'static, size: Option<u64>) -> Self {
let pinned = Box::pin(r);
let r = tokio_util::io::ReaderStream::new(pinned);
Self {
r: Box::pin(r),
extra: None,
size,
}
}
pub fn get_size(&self) -> Option<u64> {
self.size
}
pub fn stream(self) -> impl Stream<Item = IoResult<Bytes>> {
self.r
}
// Read as many bytes as possible up to `n` and return a `SegmentedBytes`
// object.
pub async fn read_upto(&mut self, n: usize) -> IoResult<SegmentedBytes> {
let mut segmented_bytes = SegmentedBytes::new();
let mut remaining = n;
if let Some(extra) = self.extra.take() {
let len = extra.len();
if len <= remaining {
segmented_bytes.append(extra);
remaining -= len;
} else {
segmented_bytes.append(extra.slice(0..remaining));
self.extra = Some(extra.slice(remaining..));
return Ok(segmented_bytes);
}
}
while remaining > 0 {
let bytes = self.r.next().await;
if bytes.is_none() {
break;
}
let bytes = bytes.unwrap()?;
let len = bytes.len();
if len == 0 {
break;
}
if len <= remaining {
segmented_bytes.append(bytes);
remaining -= len;
} else {
segmented_bytes.append(bytes.slice(0..remaining));
self.extra = Some(bytes.slice(remaining..));
break;
}
}
Ok(segmented_bytes)
}
pub async fn to_segmented_bytes(mut self) -> IoResult<SegmentedBytes> {
let mut segmented_bytes = SegmentedBytes::new();
while let Some(bytes) = self.r.next().await {
let bytes = bytes?;
if bytes.is_empty() {
break;
}
segmented_bytes.append(bytes);
}
Ok(segmented_bytes)
}
}
#[derive(Debug, Clone)]
pub struct SegmentedBytes {
segments: Vec<Vec<Bytes>>,
total_size: usize,
}
impl Default for SegmentedBytes {
fn default() -> Self {
Self::new()
}
}
impl SegmentedBytes {
pub fn new() -> Self {
SegmentedBytes {
segments: Vec::new(),
total_size: 0,
}
}
pub fn len(&self) -> usize {
self.total_size
}
pub fn append(&mut self, bytes: Bytes) {
let last_segment = self.segments.last_mut();
if let Some(last_segment) = last_segment {
let last_len = last_segment.last().map(|b| b.len());
if let Some(last_len) = last_len {
if bytes.len() == last_len {
self.total_size += bytes.len();
last_segment.push(bytes);
return;
}
}
}
self.total_size += bytes.len();
self.segments.push(vec![bytes]);
}
pub fn iter(&self) -> SegmentedBytesIterator {
SegmentedBytesIterator {
sb: self,
current_segment: 0,
current_segment_index: 0,
}
}
// Copy all the content into a single `Bytes` object.
pub fn to_bytes(&self) -> Bytes {
let mut buf = BytesMut::with_capacity(self.total_size);
for segment in &self.segments {
for bytes in segment {
buf.extend_from_slice(&bytes);
}
}
buf.freeze()
}
}
impl From<Bytes> for SegmentedBytes {
fn from(bytes: Bytes) -> Self {
let mut sb = SegmentedBytes::new();
sb.append(bytes);
sb
}
}
pub struct SegmentedBytesIterator<'a> {
sb: &'a SegmentedBytes,
current_segment: usize,
current_segment_index: usize,
}
impl Iterator for SegmentedBytesIterator<'_> {
type Item = Bytes;
fn next(&mut self) -> Option<Self::Item> {
if self.current_segment >= self.sb.segments.len() {
return None;
}
let segment = &self.sb.segments[self.current_segment];
if self.current_segment_index >= segment.len() {
self.current_segment += 1;
self.current_segment_index = 0;
return self.next();
}
let bytes = &segment[self.current_segment_index];
self.current_segment_index += 1;
Some(bytes.clone())
}
}

File diff suppressed because it is too large Load Diff

View File

@ -27,8 +27,8 @@ use crate::s3::types::{
ReplicationConfig, RetentionMode, SseConfig, ReplicationConfig, RetentionMode, SseConfig,
}; };
use crate::s3::utils::{ use crate::s3::utils::{
from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, merge, sha256_hash, from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, md5sum_hash_sb,
to_amz_date, to_iso8601utc, utc_now, Multimap, merge, sha256_hash_sb, to_amz_date, to_iso8601utc, utc_now, Multimap,
}; };
use async_recursion::async_recursion; use async_recursion::async_recursion;
@ -36,6 +36,7 @@ use bytes::{Buf, Bytes};
use dashmap::DashMap; use dashmap::DashMap;
use hyper::http::Method; use hyper::http::Method;
use reqwest::header::HeaderMap; use reqwest::header::HeaderMap;
use reqwest::Body;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::io::prelude::*; use std::io::prelude::*;
@ -43,10 +44,12 @@ use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use xmltree::Element; use xmltree::Element;
mod get_object;
mod list_objects; mod list_objects;
mod listen_bucket_notification; mod listen_bucket_notification;
mod put_object;
use super::builders::{GetBucketVersioning, ListBuckets}; use super::builders::{GetBucketVersioning, ListBuckets, SegmentedBytes};
/// Client Builder manufactures a Client using given parameters. /// Client Builder manufactures a Client using given parameters.
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -180,6 +183,10 @@ impl Client {
self.base_url.is_aws_host() self.base_url.is_aws_host()
} }
pub fn is_secure(&self) -> bool {
self.base_url.https
}
fn build_headers( fn build_headers(
&self, &self,
headers: &mut Multimap, headers: &mut Multimap,
@ -187,7 +194,7 @@ impl Client {
region: &str, region: &str,
url: &Url, url: &Url,
method: &Method, method: &Method,
data: &[u8], data: Option<&SegmentedBytes>,
) { ) {
headers.insert(String::from("Host"), url.host_header_value()); headers.insert(String::from("Host"), url.host_header_value());
@ -195,6 +202,8 @@ impl Client {
let mut sha256 = String::new(); let mut sha256 = String::new();
match *method { match *method {
Method::PUT | Method::POST => { Method::PUT | Method::POST => {
let empty_sb = SegmentedBytes::new();
let data = data.unwrap_or(&empty_sb);
headers.insert(String::from("Content-Length"), data.len().to_string()); headers.insert(String::from("Content-Length"), data.len().to_string());
if !headers.contains_key("Content-Type") { if !headers.contains_key("Content-Type") {
headers.insert( headers.insert(
@ -203,9 +212,9 @@ impl Client {
); );
} }
if self.provider.is_some() { if self.provider.is_some() {
sha256 = sha256_hash(data); sha256 = sha256_hash_sb(data);
} else if !headers.contains_key("Content-MD5") { } else if !headers.contains_key("Content-MD5") {
md5sum = md5sum_hash(data); md5sum = md5sum_hash_sb(data);
} }
} }
_ => { _ => {
@ -398,10 +407,9 @@ impl Client {
query_params: &Multimap, query_params: &Multimap,
bucket_name: Option<&str>, bucket_name: Option<&str>,
object_name: Option<&str>, object_name: Option<&str>,
data: Option<&[u8]>, body: Option<&SegmentedBytes>,
retry: bool, retry: bool,
) -> Result<reqwest::Response, Error> { ) -> Result<reqwest::Response, Error> {
let body = data.unwrap_or_default();
let url = let url =
self.base_url self.base_url
.build_url(method, region, query_params, bucket_name, object_name)?; .build_url(method, region, query_params, bucket_name, object_name)?;
@ -416,7 +424,16 @@ impl Client {
} }
if *method == Method::PUT || *method == Method::POST { if *method == Method::PUT || *method == Method::POST {
req = req.body(body.to_vec()); let mut bytes_vec = vec![];
if let Some(body) = body {
bytes_vec = body.iter().collect();
};
let stream = futures_util::stream::iter(
bytes_vec
.into_iter()
.map(|x| -> Result<_, std::io::Error> { Ok(x.clone()) }),
);
req = req.body(Body::wrap_stream(stream));
} }
let resp = req.send().await?; let resp = req.send().await?;
@ -460,7 +477,30 @@ impl Client {
query_params: &Multimap, query_params: &Multimap,
bucket_name: Option<&str>, bucket_name: Option<&str>,
object_name: Option<&str>, object_name: Option<&str>,
data: Option<&[u8]>, data: Option<Bytes>,
) -> Result<reqwest::Response, Error> {
let sb = data.map(SegmentedBytes::from);
self.execute2(
method,
region,
headers,
query_params,
bucket_name,
object_name,
sb.as_ref(),
)
.await
}
pub async fn execute2(
&self,
method: Method,
region: &String,
headers: &mut Multimap,
query_params: &Multimap,
bucket_name: Option<&str>,
object_name: Option<&str>,
data: Option<&SegmentedBytes>,
) -> Result<reqwest::Response, Error> { ) -> Result<reqwest::Response, Error> {
let res = self let res = self
.do_execute( .do_execute(
@ -557,7 +597,7 @@ impl Client {
} }
/// Executes [AbortMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html) S3 API /// Executes [AbortMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html) S3 API
pub async fn abort_multipart_upload( pub async fn abort_multipart_upload_old(
&self, &self,
args: &AbortMultipartUploadArgs<'_>, args: &AbortMultipartUploadArgs<'_>,
) -> Result<AbortMultipartUploadResponse, Error> { ) -> Result<AbortMultipartUploadResponse, Error> {
@ -644,7 +684,7 @@ impl Client {
} }
/// Executes [CompleteMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html) S3 API /// Executes [CompleteMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html) S3 API
pub async fn complete_multipart_upload( pub async fn complete_multipart_upload_old(
&self, &self,
args: &CompleteMultipartUploadArgs<'_>, args: &CompleteMultipartUploadArgs<'_>,
) -> Result<CompleteMultipartUploadResponse, Error> { ) -> Result<CompleteMultipartUploadResponse, Error> {
@ -659,7 +699,7 @@ impl Client {
data.push_str(&s); data.push_str(&s);
} }
data.push_str("</CompleteMultipartUpload>"); data.push_str("</CompleteMultipartUpload>");
let b = data.as_bytes(); let data: Bytes = data.into();
let mut headers = Multimap::new(); let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers { if let Some(v) = &args.extra_headers {
@ -669,7 +709,7 @@ impl Client {
String::from("Content-Type"), String::from("Content-Type"),
String::from("application/xml"), String::from("application/xml"),
); );
headers.insert(String::from("Content-MD5"), md5sum_hash(b)); headers.insert(String::from("Content-MD5"), md5sum_hash(data.as_ref()));
let mut query_params = Multimap::new(); let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params { if let Some(v) = &args.extra_query_params {
@ -685,7 +725,7 @@ impl Client {
&query_params, &query_params,
Some(args.bucket), Some(args.bucket),
Some(args.object), Some(args.object),
Some(b), Some(data),
) )
.await?; .await?;
let header_map = resp.headers().clone(); let header_map = resp.headers().clone();
@ -762,7 +802,7 @@ impl Client {
object_size += size; object_size += size;
if object_size > MAX_OBJECT_SIZE { if object_size > MAX_OBJECT_SIZE {
return Err(Error::InvalidObjectSize(object_size)); return Err(Error::InvalidObjectSize(object_size as u64));
} }
if size > MAX_PART_SIZE { if size > MAX_PART_SIZE {
@ -838,7 +878,7 @@ impl Client {
cmu_args.extra_query_params = args.extra_query_params; cmu_args.extra_query_params = args.extra_query_params;
cmu_args.region = args.region; cmu_args.region = args.region;
cmu_args.headers = Some(&headers); cmu_args.headers = Some(&headers);
let resp = self.create_multipart_upload(&cmu_args).await?; let resp = self.create_multipart_upload_old(&cmu_args).await?;
upload_id.push_str(&resp.upload_id); upload_id.push_str(&resp.upload_id);
let mut part_number = 0_u16; let mut part_number = 0_u16;
@ -932,7 +972,7 @@ impl Client {
let mut cmu_args = let mut cmu_args =
CompleteMultipartUploadArgs::new(args.bucket, args.object, upload_id, &parts)?; CompleteMultipartUploadArgs::new(args.bucket, args.object, upload_id, &parts)?;
cmu_args.region = args.region; cmu_args.region = args.region;
self.complete_multipart_upload(&cmu_args).await self.complete_multipart_upload_old(&cmu_args).await
} }
pub async fn compose_object( pub async fn compose_object(
@ -949,7 +989,7 @@ impl Client {
let res = self.do_compose_object(args, &mut upload_id).await; let res = self.do_compose_object(args, &mut upload_id).await;
if res.is_err() && !upload_id.is_empty() { if res.is_err() && !upload_id.is_empty() {
let amuargs = &AbortMultipartUploadArgs::new(args.bucket, args.object, &upload_id)?; let amuargs = &AbortMultipartUploadArgs::new(args.bucket, args.object, &upload_id)?;
self.abort_multipart_upload(amuargs).await?; self.abort_multipart_upload_old(amuargs).await?;
} }
res res
@ -1064,7 +1104,7 @@ impl Client {
} }
/// Executes [CreateMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html) S3 API /// Executes [CreateMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html) S3 API
pub async fn create_multipart_upload( pub async fn create_multipart_upload_old(
&self, &self,
args: &CreateMultipartUploadArgs<'_>, args: &CreateMultipartUploadArgs<'_>,
) -> Result<CreateMultipartUploadResponse, Error> { ) -> Result<CreateMultipartUploadResponse, Error> {
@ -1189,7 +1229,9 @@ impl Client {
&query_params, &query_params,
Some(args.bucket), Some(args.bucket),
Some(args.object), Some(args.object),
Some(b"<LegalHold><Status>OFF</Status></LegalHold>"), Some(Bytes::from(
&b"<LegalHold><Status>OFF</Status></LegalHold>"[..],
)),
) )
.await?; .await?;
@ -1456,7 +1498,7 @@ impl Client {
args: &DownloadObjectArgs<'_>, args: &DownloadObjectArgs<'_>,
) -> Result<DownloadObjectResponse, Error> { ) -> Result<DownloadObjectResponse, Error> {
let mut resp = self let mut resp = self
.get_object(&GetObjectArgs { .get_object_old(&GetObjectArgs {
extra_headers: args.extra_headers, extra_headers: args.extra_headers,
extra_query_params: args.extra_query_params, extra_query_params: args.extra_query_params,
region: args.region, region: args.region,
@ -1523,7 +1565,9 @@ impl Client {
&query_params, &query_params,
Some(args.bucket), Some(args.bucket),
Some(args.object), Some(args.object),
Some(b"<LegalHold><Status>ON</Status></LegalHold>"), Some(Bytes::from(
&b"<LegalHold><Status>ON</Status></LegalHold>"[..],
)),
) )
.await?; .await?;
@ -1852,7 +1896,10 @@ impl Client {
GetBucketVersioning::new(bucket).client(self) GetBucketVersioning::new(bucket).client(self)
} }
pub async fn get_object(&self, args: &GetObjectArgs<'_>) -> Result<reqwest::Response, Error> { pub async fn get_object_old(
&self,
args: &GetObjectArgs<'_>,
) -> Result<reqwest::Response, Error> {
if args.ssec.is_some() && !self.base_url.https { if args.ssec.is_some() && !self.base_url.https {
return Err(Error::SseTlsRequired(None)); return Err(Error::SseTlsRequired(None));
} }
@ -2239,7 +2286,7 @@ impl Client {
let body = match data.is_empty() { let body = match data.is_empty() {
true => None, true => None,
false => Some(data.as_bytes()), false => Some(data.into()),
}; };
let resp = self let resp = self
@ -2364,7 +2411,7 @@ impl Client {
cmuargs.region = args.region; cmuargs.region = args.region;
cmuargs.headers = Some(&headers); cmuargs.headers = Some(&headers);
let resp = self.create_multipart_upload(&cmuargs).await?; let resp = self.create_multipart_upload_old(&cmuargs).await?;
upload_id.push_str(&resp.upload_id); upload_id.push_str(&resp.upload_id);
} }
@ -2386,7 +2433,7 @@ impl Client {
}; };
upargs.headers = Some(&ssec_headers); upargs.headers = Some(&ssec_headers);
let resp = self.upload_part(&upargs).await?; let resp = self.upload_part_old(&upargs).await?;
parts.push(Part { parts.push(Part {
number: part_number as u16, number: part_number as u16,
etag: resp.etag.clone(), etag: resp.etag.clone(),
@ -2397,10 +2444,10 @@ impl Client {
CompleteMultipartUploadArgs::new(args.bucket, args.object, upload_id, &parts)?; CompleteMultipartUploadArgs::new(args.bucket, args.object, upload_id, &parts)?;
cmuargs.region = args.region; cmuargs.region = args.region;
self.complete_multipart_upload(&cmuargs).await self.complete_multipart_upload_old(&cmuargs).await
} }
pub async fn put_object( pub async fn put_object_old(
&self, &self,
args: &mut PutObjectArgs<'_>, args: &mut PutObjectArgs<'_>,
) -> Result<PutObjectResponse, Error> { ) -> Result<PutObjectResponse, Error> {
@ -2423,7 +2470,7 @@ impl Client {
if res.is_err() && !upload_id.is_empty() { if res.is_err() && !upload_id.is_empty() {
let amuargs = &AbortMultipartUploadArgs::new(args.bucket, args.object, &upload_id)?; let amuargs = &AbortMultipartUploadArgs::new(args.bucket, args.object, &upload_id)?;
self.abort_multipart_upload(amuargs).await?; self.abort_multipart_upload_old(amuargs).await?;
} }
res res
@ -2454,7 +2501,7 @@ impl Client {
&query_params, &query_params,
Some(args.bucket), Some(args.bucket),
Some(args.object), Some(args.object),
Some(args.data), Some(Bytes::copy_from_slice(args.data)),
) )
.await?; .await?;
let header_map = resp.headers(); let header_map = resp.headers();
@ -2573,7 +2620,7 @@ impl Client {
data.push_str("</Object>"); data.push_str("</Object>");
} }
data.push_str("</Delete>"); data.push_str("</Delete>");
let b = data.as_bytes(); let data: Bytes = data.into();
let mut headers = Multimap::new(); let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers { if let Some(v) = &args.extra_headers {
@ -2589,7 +2636,7 @@ impl Client {
String::from("Content-Type"), String::from("Content-Type"),
String::from("application/xml"), String::from("application/xml"),
); );
headers.insert(String::from("Content-MD5"), md5sum_hash(b)); headers.insert(String::from("Content-MD5"), md5sum_hash(data.as_ref()));
let mut query_params = Multimap::new(); let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params { if let Some(v) = &args.extra_query_params {
@ -2605,7 +2652,7 @@ impl Client {
&query_params, &query_params,
Some(args.bucket), Some(args.bucket),
None, None,
Some(b), Some(data),
) )
.await?; .await?;
let header_map = resp.headers().clone(); let header_map = resp.headers().clone();
@ -2704,7 +2751,7 @@ impl Client {
&query_params, &query_params,
Some(args.bucket), Some(args.bucket),
None, None,
Some(args.config.to_xml().as_bytes()), Some(args.config.to_xml().into()),
) )
.await?; .await?;
@ -2740,7 +2787,7 @@ impl Client {
&query_params, &query_params,
Some(args.bucket), Some(args.bucket),
None, None,
Some(args.config.to_xml().as_bytes()), Some(args.config.to_xml().into()),
) )
.await?; .await?;
@ -2776,7 +2823,7 @@ impl Client {
&query_params, &query_params,
Some(args.bucket), Some(args.bucket),
None, None,
Some(args.config.to_xml().as_bytes()), Some(args.config.to_xml().into()),
) )
.await?; .await?;
@ -2812,7 +2859,7 @@ impl Client {
&query_params, &query_params,
Some(args.bucket), Some(args.bucket),
None, None,
Some(args.config.as_bytes()), Some(args.config.to_string().into()),
) )
.await?; .await?;
@ -2848,7 +2895,7 @@ impl Client {
&query_params, &query_params,
Some(args.bucket), Some(args.bucket),
None, None,
Some(args.config.to_xml().as_bytes()), Some(args.config.to_xml().into()),
) )
.await?; .await?;
@ -2901,7 +2948,7 @@ impl Client {
&query_params, &query_params,
Some(args.bucket), Some(args.bucket),
None, None,
Some(data.as_bytes()), Some(data.into()),
) )
.await?; .await?;
@ -2954,7 +3001,7 @@ impl Client {
&query_params, &query_params,
Some(args.bucket), Some(args.bucket),
None, None,
Some(data.as_bytes()), Some(data.into()),
) )
.await?; .await?;
@ -2990,7 +3037,7 @@ impl Client {
&query_params, &query_params,
Some(args.bucket), Some(args.bucket),
None, None,
Some(args.config.to_xml().as_bytes()), Some(args.config.to_xml().into()),
) )
.await?; .await?;
@ -3050,7 +3097,7 @@ impl Client {
&query_params, &query_params,
Some(args.bucket), Some(args.bucket),
Some(args.object), Some(args.object),
Some(data.as_bytes()), Some(data.into()),
) )
.await?; .await?;
@ -3108,7 +3155,7 @@ impl Client {
&query_params, &query_params,
Some(args.bucket), Some(args.bucket),
Some(args.object), Some(args.object),
Some(data.as_bytes()), Some(data.into()),
) )
.await?; .await?;
@ -3132,13 +3179,13 @@ impl Client {
let region = self.get_region(args.bucket, args.region).await?; let region = self.get_region(args.bucket, args.region).await?;
let data = args.request.to_xml(); let data = args.request.to_xml();
let b = data.as_bytes(); let data: Bytes = data.into();
let mut headers = Multimap::new(); let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers { if let Some(v) = &args.extra_headers {
merge(&mut headers, v); merge(&mut headers, v);
} }
headers.insert(String::from("Content-MD5"), md5sum_hash(b)); headers.insert(String::from("Content-MD5"), md5sum_hash(data.as_ref()));
let mut query_params = Multimap::new(); let mut query_params = Multimap::new();
if let Some(v) = &args.extra_query_params { if let Some(v) = &args.extra_query_params {
@ -3155,7 +3202,7 @@ impl Client {
&query_params, &query_params,
Some(args.bucket), Some(args.bucket),
Some(args.object), Some(args.object),
Some(b), Some(data),
) )
.await?, .await?,
&region, &region,
@ -3209,7 +3256,7 @@ impl Client {
) -> Result<UploadObjectResponse, Error> { ) -> Result<UploadObjectResponse, Error> {
let mut file = File::open(args.filename)?; let mut file = File::open(args.filename)?;
self.put_object(&mut PutObjectArgs { self.put_object_old(&mut PutObjectArgs {
extra_headers: args.extra_headers, extra_headers: args.extra_headers,
extra_query_params: args.extra_query_params, extra_query_params: args.extra_query_params,
region: args.region, region: args.region,
@ -3231,7 +3278,7 @@ impl Client {
} }
/// Executes [UploadPart](https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html) S3 API /// Executes [UploadPart](https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html) S3 API
pub async fn upload_part( pub async fn upload_part_old(
&self, &self,
args: &UploadPartArgs<'_>, args: &UploadPartArgs<'_>,
) -> Result<UploadPartResponse, Error> { ) -> Result<UploadPartResponse, Error> {

View File

@ -0,0 +1,27 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2023 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! S3 APIs for downloading objects.
use crate::s3::builders::GetObject;
use super::Client;
impl Client {
/// Create a GetObject request builder.
pub fn get_object(&self, bucket: &str, object: &str) -> GetObject {
GetObject::new(bucket, object).client(self)
}
}

View File

@ -0,0 +1,88 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2023 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! S3 APIs for uploading objects.
use std::path::Path;
use super::Client;
use crate::s3::{
builders::{
AbortMultipartUpload, CompleteMultipartUpload, CreateMultipartUpload, ObjectContent,
PutObject, PutObjectContent, SegmentedBytes, UploadPart,
},
types::Part,
};
impl Client {
/// Create a PutObject request builder. This is a lower-level API that
/// performs a non-multipart object upload.
pub fn put_object(&self, bucket: &str, object: &str, data: SegmentedBytes) -> PutObject {
PutObject::new(bucket, object, data).client(self)
}
/// Create a CreateMultipartUpload request builder.
pub fn create_multipart_upload(&self, bucket: &str, object: &str) -> CreateMultipartUpload {
CreateMultipartUpload::new(bucket, object).client(self)
}
pub fn abort_multipart_upload(
&self,
bucket: &str,
object: &str,
upload_id: &str,
) -> AbortMultipartUpload {
AbortMultipartUpload::new(bucket, object, upload_id).client(self)
}
pub fn complete_multipart_upload(
&self,
bucket: &str,
object: &str,
upload_id: &str,
parts: Vec<Part>,
) -> CompleteMultipartUpload {
CompleteMultipartUpload::new(bucket, object, upload_id, parts).client(self)
}
pub fn upload_part(
&self,
bucket: &str,
object: &str,
upload_id: &str,
part_number: u16,
data: SegmentedBytes,
) -> UploadPart {
UploadPart::new(bucket, object, upload_id, part_number, data).client(self)
}
pub fn put_object_content(
&self,
bucket: &str,
object: &str,
content: impl Into<ObjectContent>,
) -> PutObjectContent {
PutObjectContent::new(bucket, object, content).client(self)
}
pub fn put_object_from_file(
&self,
bucket: &str,
object: &str,
file_path: &Path,
) -> PutObjectContent {
PutObjectContent::from_file(bucket, object, file_path).client(self)
}
}

View File

@ -79,11 +79,11 @@ pub enum Error {
EmptyParts(String), EmptyParts(String),
InvalidRetentionMode(String), InvalidRetentionMode(String),
InvalidRetentionConfig(String), InvalidRetentionConfig(String),
InvalidMinPartSize(usize), InvalidMinPartSize(u64),
InvalidMaxPartSize(usize), InvalidMaxPartSize(u64),
InvalidObjectSize(usize), InvalidObjectSize(u64),
MissingPartSize, MissingPartSize,
InvalidPartCount(usize, usize, u16), InvalidPartCount(u64, u64, u16),
SseTlsRequired(Option<String>), SseTlsRequired(Option<String>),
InsufficientData(usize, usize), InsufficientData(usize, usize),
InvalidLegalHold(String), InvalidLegalHold(String),
@ -111,6 +111,7 @@ pub enum Error {
InvalidObjectLockConfig(String), InvalidObjectLockConfig(String),
NoClientProvided, NoClientProvided,
TagDecodingError(String, String), TagDecodingError(String, String),
ContentLengthUnknown,
} }
impl std::error::Error for Error {} impl std::error::Error for Error {}
@ -216,6 +217,7 @@ impl fmt::Display for Error {
Error::InvalidObjectLockConfig(m) => write!(f, "{}", m), Error::InvalidObjectLockConfig(m) => write!(f, "{}", m),
Error::NoClientProvided => write!(f, "no client provided"), Error::NoClientProvided => write!(f, "no client provided"),
Error::TagDecodingError(input, error_message) => write!(f, "tag decoding failed: {} on input '{}'", error_message, input), Error::TagDecodingError(input, error_message) => write!(f, "tag decoding failed: {} on input '{}'", error_message, input),
Error::ContentLengthUnknown => write!(f, "content length is unknown"),
} }
} }
} }

View File

@ -223,7 +223,7 @@ impl FromStr for BaseUrl {
/// Convert a string to a BaseUrl. /// Convert a string to a BaseUrl.
/// ///
/// Enables use of [`str`]'s [`parse`] method to create a [`BaseUrl`]. /// Enables use of [`str::parse`] method to create a [`BaseUrl`].
/// ///
/// # Examples /// # Examples
/// ///

View File

@ -32,14 +32,21 @@ use crate::s3::utils::{
}; };
mod buckets; mod buckets;
mod get_object;
mod list_objects; mod list_objects;
mod listen_bucket_notification; mod listen_bucket_notification;
mod put_object;
pub use buckets::{GetBucketVersioningResponse, ListBucketsResponse}; pub use buckets::{GetBucketVersioningResponse, ListBucketsResponse};
pub use get_object::GetObjectResponse2;
pub use list_objects::{ pub use list_objects::{
ListObjectVersionsResponse, ListObjectsResponse, ListObjectsV1Response, ListObjectsV2Response, ListObjectVersionsResponse, ListObjectsResponse, ListObjectsV1Response, ListObjectsV2Response,
}; };
pub use listen_bucket_notification::ListenBucketNotificationResponse; pub use listen_bucket_notification::ListenBucketNotificationResponse;
pub use put_object::{
AbortMultipartUploadResponse2, CompleteMultipartUploadResponse2,
CreateMultipartUploadResponse2, PutObjectResponse as PutObjectResponse2, UploadPartResponse2,
};
#[derive(Debug)] #[derive(Debug)]
/// Base response for bucket operation /// Base response for bucket operation

View File

@ -0,0 +1,64 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2023 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use tokio_stream::StreamExt;
use crate::s3::{
builders::ObjectContent,
error::Error,
types::{FromS3Response, S3Request},
};
pub struct GetObjectResponse2 {
pub headers: http::HeaderMap,
pub region: String,
pub bucket_name: String,
pub object_name: String,
pub version_id: Option<String>,
pub content: ObjectContent,
}
#[async_trait]
impl FromS3Response for GetObjectResponse2 {
async fn from_s3response<'a>(
req: S3Request<'a>,
response: reqwest::Response,
) -> Result<Self, Error> {
let header_map = response.headers().clone();
let version_id = match header_map.get("x-amz-version-id") {
Some(v) => Some(v.to_str()?.to_string()),
None => None,
};
let content_length = response
.content_length()
.ok_or(Error::ContentLengthUnknown)?;
let body = response.bytes_stream().map(|result| {
result.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
});
let content = ObjectContent::new(body, Some(content_length));
Ok(GetObjectResponse2 {
headers: header_map,
region: req.region.unwrap_or("").to_string(),
bucket_name: req.bucket.unwrap().to_string(),
object_name: req.object.unwrap().to_string(),
version_id,
content,
})
}
}

View File

@ -0,0 +1,97 @@
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2023 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use bytes::Buf;
use http::HeaderMap;
use xmltree::Element;
use crate::s3::{
error::Error,
types::{FromS3Response, S3Request},
utils::get_text,
};
#[derive(Debug, Clone)]
pub struct PutObjectResponse {
pub headers: HeaderMap,
pub bucket_name: String,
pub object_name: String,
pub location: String,
pub etag: String,
pub version_id: Option<String>,
}
#[async_trait]
impl FromS3Response for PutObjectResponse {
async fn from_s3response<'a>(
req: S3Request<'a>,
response: reqwest::Response,
) -> Result<Self, Error> {
let header_map = response.headers();
Ok(PutObjectResponse {
headers: header_map.clone(),
bucket_name: req.bucket.unwrap().to_string(),
object_name: req.object.unwrap().to_string(),
location: req.region.unwrap_or("").to_string(),
etag: match header_map.get("etag") {
Some(v) => v.to_str()?.to_string().trim_matches('"').to_string(),
_ => String::new(),
},
version_id: match header_map.get("x-amz-version-id") {
Some(v) => Some(v.to_str()?.to_string()),
None => None,
},
})
}
}
pub type CreateMultipartUploadResponse2 = UploadIdResponse2;
#[derive(Debug, Clone)]
pub struct UploadIdResponse2 {
pub headers: HeaderMap,
pub region: String,
pub bucket_name: String,
pub object_name: String,
pub upload_id: String,
}
#[async_trait]
impl FromS3Response for UploadIdResponse2 {
async fn from_s3response<'a>(
req: S3Request<'a>,
response: reqwest::Response,
) -> Result<Self, Error> {
let header_map = response.headers().clone();
let body = response.bytes().await?;
let root = Element::parse(body.reader())?;
Ok(CreateMultipartUploadResponse2 {
headers: header_map.clone(),
region: req.region.unwrap_or("").to_string(),
bucket_name: req.bucket.unwrap().to_string(),
object_name: req.object.unwrap().to_string(),
upload_id: get_text(&root, "UploadId")?,
})
}
}
pub type AbortMultipartUploadResponse2 = UploadIdResponse2;
pub type CompleteMultipartUploadResponse2 = PutObjectResponse;
pub type UploadPartResponse2 = PutObjectResponse;

View File

@ -19,7 +19,7 @@ use crate::s3::utils;
use std::any::Any; use std::any::Any;
/// Base server side encryption /// Base server side encryption
pub trait Sse: std::fmt::Debug { pub trait Sse: std::fmt::Debug + Send + Sync {
fn headers(&self) -> utils::Multimap; fn headers(&self) -> utils::Multimap;
fn copy_headers(&self) -> utils::Multimap; fn copy_headers(&self) -> utils::Multimap;
fn tls_required(&self) -> bool; fn tls_required(&self) -> bool;

View File

@ -15,6 +15,7 @@
//! Various types for S3 API requests and responses //! Various types for S3 API requests and responses
use super::builders::SegmentedBytes;
use super::client::Client; use super::client::Client;
use crate::s3::error::Error; use crate::s3::error::Error;
use crate::s3::utils::{ use crate::s3::utils::{
@ -39,7 +40,7 @@ pub struct S3Request<'a> {
pub object: Option<&'a str>, pub object: Option<&'a str>,
pub query_params: Multimap, pub query_params: Multimap,
pub headers: Multimap, pub headers: Multimap,
pub body: Option<Vec<u8>>, pub body: Option<SegmentedBytes>,
// Computed region // Computed region
inner_region: String, inner_region: String,
@ -85,7 +86,7 @@ impl<'a> S3Request<'a> {
self self
} }
pub fn body(mut self, body: Option<Vec<u8>>) -> Self { pub fn body(mut self, body: Option<SegmentedBytes>) -> Self {
self.body = body; self.body = body;
self self
} }
@ -104,14 +105,14 @@ impl<'a> S3Request<'a> {
// Execute the API request. // Execute the API request.
self.client self.client
.execute( .execute2(
self.method.clone(), self.method.clone(),
&self.inner_region, &self.inner_region,
&mut self.headers, &mut self.headers,
&self.query_params, &self.query_params,
self.bucket, self.bucket,
self.object, self.object,
self.body.as_ref().map(|x| x.as_slice()), self.body.as_ref(),
) )
.await .await
} }

View File

@ -34,6 +34,8 @@ use xmltree::Element;
use crate::s3::error::Error; use crate::s3::error::Error;
use super::builders::SegmentedBytes;
/// Date and time with UTC timezone /// Date and time with UTC timezone
pub type UtcTime = DateTime<Utc>; pub type UtcTime = DateTime<Utc>;
@ -71,11 +73,27 @@ pub fn sha256_hash(data: &[u8]) -> String {
format!("{:x}", hasher.finalize()) format!("{:x}", hasher.finalize())
} }
pub fn sha256_hash_sb(sb: &SegmentedBytes) -> String {
let mut hasher = Sha256::new();
for data in sb.iter() {
hasher.update(data);
}
format!("{:x}", hasher.finalize())
}
/// Gets bas64 encoded MD5 hash of given data /// Gets bas64 encoded MD5 hash of given data
pub fn md5sum_hash(data: &[u8]) -> String { pub fn md5sum_hash(data: &[u8]) -> String {
b64encode(md5compute(data).as_slice()) b64encode(md5compute(data).as_slice())
} }
pub fn md5sum_hash_sb(sb: &SegmentedBytes) -> String {
let mut hasher = md5::Context::new();
for data in sb.iter() {
hasher.consume(data);
}
b64encode(hasher.compute().as_slice())
}
/// Gets current UTC time /// Gets current UTC time
pub fn utc_now() -> UtcTime { pub fn utc_now() -> UtcTime {
chrono::offset::Utc::now() chrono::offset::Utc::now()

View File

@ -14,16 +14,23 @@
// limitations under the License. // limitations under the License.
use async_std::task; use async_std::task;
use bytes::Bytes;
use chrono::Duration; use chrono::Duration;
use futures_util::Stream;
use hyper::http::Method; use hyper::http::Method;
use rand::distributions::{Alphanumeric, DistString}; use minio::s3::builders::ObjectContent;
use rand::{
distributions::{Alphanumeric, DistString},
rngs::SmallRng,
SeedableRng,
};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::collections::HashMap; use std::collections::HashMap;
use std::io::BufReader; use std::io::BufReader;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::{fs, io}; use std::{fs, io};
use tokio::sync::mpsc; use tokio::{io::AsyncRead, sync::mpsc};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use minio::s3::args::*; use minio::s3::args::*;
@ -67,6 +74,72 @@ impl std::io::Read for RandReader {
} }
} }
struct RandSrc {
size: u64,
rng: SmallRng,
}
impl RandSrc {
fn new(size: u64) -> RandSrc {
let rng = SmallRng::from_entropy();
RandSrc { size, rng }
}
}
impl Stream for RandSrc {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
if self.size == 0 {
return task::Poll::Ready(None);
}
let bytes_read = match self.size > 64 * 1024 {
true => 64 * 1024,
false => self.size as usize,
};
let this = self.get_mut();
let mut buf = vec![0; bytes_read];
let random: &mut dyn rand::RngCore = &mut this.rng;
random.fill_bytes(&mut buf);
this.size -= bytes_read as u64;
task::Poll::Ready(Some(Ok(Bytes::from(buf))))
}
}
impl AsyncRead for RandSrc {
fn poll_read(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
read_buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let buf = read_buf.initialize_unfilled();
let bytes_read = match self.size > (buf.len() as u64) {
true => buf.len(),
false => self.size as usize,
};
let this = self.get_mut();
if bytes_read > 0 {
let random: &mut dyn rand::RngCore = &mut this.rng;
random.fill_bytes(&mut buf[0..bytes_read]);
}
this.size -= bytes_read as u64;
read_buf.advance(bytes_read);
std::task::Poll::Ready(Ok(()))
}
}
fn rand_bucket_name() -> String { fn rand_bucket_name() -> String {
Alphanumeric Alphanumeric
.sample_string(&mut rand::thread_rng(), 8) .sample_string(&mut rand::thread_rng(), 8)
@ -183,7 +256,7 @@ impl ClientTest {
let object_name = rand_object_name(); let object_name = rand_object_name();
let size = 16_usize; let size = 16_usize;
self.client self.client
.put_object( .put_object_old(
&mut PutObjectArgs::new( &mut PutObjectArgs::new(
&self.test_bucket, &self.test_bucket,
&object_name, &object_name,
@ -213,7 +286,7 @@ impl ClientTest {
let object_name = rand_object_name(); let object_name = rand_object_name();
let size: usize = 16 + 5 * 1024 * 1024; let size: usize = 16 + 5 * 1024 * 1024;
self.client self.client
.put_object( .put_object_old(
&mut PutObjectArgs::new( &mut PutObjectArgs::new(
&self.test_bucket, &self.test_bucket,
&object_name, &object_name,
@ -239,11 +312,41 @@ impl ClientTest {
.unwrap(); .unwrap();
} }
async fn get_object(&self) { async fn put_object_content(&self) {
let object_name = rand_object_name();
let sizes = vec![16_u64, 5 * 1024 * 1024, 16 + 5 * 1024 * 1024];
for size in sizes.iter() {
let data_src = RandSrc::new(*size);
let rsp = self
.client
.put_object_content(
&self.test_bucket,
&object_name,
ObjectContent::new(data_src, Some(*size)),
)
.send()
.await
.unwrap();
let etag = rsp.etag;
let resp = self
.client
.stat_object(&StatObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.await
.unwrap();
assert_eq!(resp.size, *size as usize);
assert_eq!(resp.etag, etag);
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.await
.unwrap();
}
}
async fn get_object_old(&self) {
let object_name = rand_object_name(); let object_name = rand_object_name();
let data = "hello, world"; let data = "hello, world";
self.client self.client
.put_object( .put_object_old(
&mut PutObjectArgs::new( &mut PutObjectArgs::new(
&self.test_bucket, &self.test_bucket,
&object_name, &object_name,
@ -257,7 +360,7 @@ impl ClientTest {
.unwrap(); .unwrap();
let resp = self let resp = self
.client .client
.get_object(&GetObjectArgs::new(&self.test_bucket, &object_name).unwrap()) .get_object_old(&GetObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.await .await
.unwrap(); .unwrap();
let got = resp.text().await.unwrap(); let got = resp.text().await.unwrap();
@ -268,6 +371,28 @@ impl ClientTest {
.unwrap(); .unwrap();
} }
async fn get_object(&self) {
let object_name = rand_object_name();
let data = Bytes::from("hello, world".to_string().into_bytes());
self.client
.put_object_content(&self.test_bucket, &object_name, data.clone())
.send()
.await
.unwrap();
let resp = self
.client
.get_object(&self.test_bucket, &object_name)
.send()
.await
.unwrap();
let got = resp.content.to_segmented_bytes().await.unwrap().to_bytes();
assert_eq!(got, data);
self.client
.remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap())
.await
.unwrap();
}
fn get_hash(filename: &String) -> String { fn get_hash(filename: &String) -> String {
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
let mut file = fs::File::open(filename).unwrap(); let mut file = fs::File::open(filename).unwrap();
@ -357,7 +482,7 @@ impl ClientTest {
let object_name = rand_object_name(); let object_name = rand_object_name();
let size = 0_usize; let size = 0_usize;
self.client self.client
.put_object( .put_object_old(
&mut PutObjectArgs::new( &mut PutObjectArgs::new(
&self.test_bucket, &self.test_bucket,
&object_name, &object_name,
@ -404,7 +529,7 @@ impl ClientTest {
let object_name = rand_object_name(); let object_name = rand_object_name();
let size = 0_usize; let size = 0_usize;
self.client self.client
.put_object( .put_object_old(
&mut PutObjectArgs::new( &mut PutObjectArgs::new(
&self.test_bucket, &self.test_bucket,
&object_name, &object_name,
@ -467,7 +592,7 @@ impl ClientTest {
let body = String::from("Year,Make,Model,Description,Price\n") + &data; let body = String::from("Year,Make,Model,Description,Price\n") + &data;
self.client self.client
.put_object( .put_object_old(
&mut PutObjectArgs::new( &mut PutObjectArgs::new(
&self.test_bucket, &self.test_bucket,
&object_name, &object_name,
@ -580,7 +705,7 @@ impl ClientTest {
let size = 16_usize; let size = 16_usize;
self.client self.client
.put_object( .put_object_old(
&mut PutObjectArgs::new( &mut PutObjectArgs::new(
&self.test_bucket, &self.test_bucket,
&object_name, &object_name,
@ -607,7 +732,7 @@ impl ClientTest {
let size = 16_usize; let size = 16_usize;
self.client self.client
.put_object( .put_object_old(
&mut PutObjectArgs::new( &mut PutObjectArgs::new(
&self.test_bucket, &self.test_bucket,
&src_object_name, &src_object_name,
@ -656,7 +781,7 @@ impl ClientTest {
let size = 16_usize; let size = 16_usize;
self.client self.client
.put_object( .put_object_old(
&mut PutObjectArgs::new( &mut PutObjectArgs::new(
&self.test_bucket, &self.test_bucket,
&src_object_name, &src_object_name,
@ -945,7 +1070,7 @@ impl ClientTest {
let size = 16_usize; let size = 16_usize;
self.client self.client
.put_object( .put_object_old(
&mut PutObjectArgs::new( &mut PutObjectArgs::new(
&self.test_bucket, &self.test_bucket,
&object_name, &object_name,
@ -1057,7 +1182,7 @@ impl ClientTest {
let size = 16_usize; let size = 16_usize;
let obj_resp = self let obj_resp = self
.client .client
.put_object( .put_object_old(
&mut PutObjectArgs::new( &mut PutObjectArgs::new(
&bucket_name, &bucket_name,
&object_name, &object_name,
@ -1195,6 +1320,12 @@ async fn s3_tests() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("[Multipart] put_object()"); println!("[Multipart] put_object()");
ctest.put_object_multipart().await; ctest.put_object_multipart().await;
println!("put_object_stream()");
ctest.put_object_content().await;
println!("get_object_old()");
ctest.get_object_old().await;
println!("get_object()"); println!("get_object()");
ctest.get_object().await; ctest.get_object().await;