Add file read/write ability to ObjectContent (#81)

This commit is contained in:
Aditya Manthramurthy 2024-04-05 19:39:45 -07:00 committed by GitHub
parent 220887f171
commit 6d8031306e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 232 additions and 128 deletions

View File

@ -20,16 +20,20 @@ chrono = "0.4.27"
crc = "3.0.1" crc = "3.0.1"
dashmap = "5.5.3" dashmap = "5.5.3"
derivative = "2.2.0" derivative = "2.2.0"
env_logger = "0.11.2"
futures-util = "0.3.28" futures-util = "0.3.28"
hex = "0.4.3" hex = "0.4.3"
hmac = "0.12.1" hmac = "0.12.1"
home = "0.5.9"
http = "0.2.9" http = "0.2.9"
hyper = { version = "0.14.27", features = ["full"] } hyper = { version = "0.14.27", features = ["full"] }
lazy_static = "1.4.0" lazy_static = "1.4.0"
log = "0.4.20"
md5 = "0.7.0" md5 = "0.7.0"
multimap = "0.10.0" multimap = "0.10.0"
os_info = "3.7.0" os_info = "3.7.0"
percent-encoding = "2.3.0" percent-encoding = "2.3.0"
rand = { version = "0.8.5", features = ["small_rng"] }
regex = "1.9.4" regex = "1.9.4"
serde = { version = "1.0.188", features = ["derive"] } serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.105" serde_json = "1.0.105"
@ -39,9 +43,6 @@ tokio-stream = "0.1.14"
tokio-util = { version = "0.7.8", features = ["io"] } tokio-util = { version = "0.7.8", features = ["io"] }
urlencoding = "2.1.3" urlencoding = "2.1.3"
xmltree = "0.10.3" xmltree = "0.10.3"
log = "0.4.20"
env_logger = "0.11.2"
home = "0.5.9"
[dependencies.reqwest] [dependencies.reqwest]
version = "0.11.20" version = "0.11.20"
@ -49,7 +50,6 @@ features = ["native-tls", "blocking", "rustls-tls", "stream"]
[dev-dependencies] [dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
rand = { version = "0.8.5", features = ["small_rng"] }
[[example]] [[example]]
name = "file-uploader" name = "file-uploader"

View File

@ -1,5 +1,6 @@
use log::info; use log::info;
use minio::s3::args::{BucketExistsArgs, MakeBucketArgs}; use minio::s3::args::{BucketExistsArgs, MakeBucketArgs};
use minio::s3::builders::ObjectContent;
use minio::s3::client::ClientBuilder; use minio::s3::client::ClientBuilder;
use minio::s3::creds::StaticProvider; use minio::s3::creds::StaticProvider;
use minio::s3::http::BaseUrl; use minio::s3::http::BaseUrl;
@ -47,8 +48,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("filename {}", &filename.to_str().unwrap()); info!("filename {}", &filename.to_str().unwrap());
let content = ObjectContent::from(filename);
client client
.put_object_from_file(bucket_name, object_name, filename) .put_object_content(bucket_name, object_name, content)
.send() .send()
.await?; .await?;

View File

@ -13,55 +13,164 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::pin::Pin; use std::path::PathBuf;
use std::{ffi::OsString, path::Path, pin::Pin};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_util::Stream; use futures_util::Stream;
use tokio::io::AsyncRead; use rand::prelude::random;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
type IoResult<T> = Result<T, std::io::Error>; type IoResult<T> = Result<T, std::io::Error>;
pub struct ObjectContent { /// Object content that can be uploaded or downloaded. Can be constructed from a stream of `Bytes`,
r: Pin<Box<dyn Stream<Item = IoResult<Bytes>>>>, /// a file path, or a `Bytes` object.
extra: Option<Bytes>, pub struct ObjectContent(ObjectContentInner);
size: Option<u64>,
enum ObjectContentInner {
Stream(Pin<Box<dyn Stream<Item = IoResult<Bytes>>>>, Option<u64>),
FilePath(PathBuf),
Bytes(SegmentedBytes),
} }
impl From<Bytes> for ObjectContent { impl From<Bytes> for ObjectContent {
fn from(value: Bytes) -> Self { fn from(value: Bytes) -> Self {
let n = value.len(); ObjectContent(ObjectContentInner::Bytes(SegmentedBytes::from(value)))
ObjectContent {
r: Box::pin(tokio_stream::iter(vec![Ok(value)])),
extra: None,
size: Some(n as u64),
}
} }
} }
impl From<String> for ObjectContent { impl From<String> for ObjectContent {
fn from(value: String) -> Self { fn from(value: String) -> Self {
let n = value.len(); ObjectContent(ObjectContentInner::Bytes(SegmentedBytes::from(
ObjectContent { Bytes::from(value),
r: Box::pin(tokio_stream::iter(vec![Ok(Bytes::from(value))])), )))
extra: None,
size: Some(n as u64),
}
} }
} }
impl From<Vec<u8>> for ObjectContent { impl From<Vec<u8>> for ObjectContent {
fn from(value: Vec<u8>) -> Self { fn from(value: Vec<u8>) -> Self {
let n = value.len(); ObjectContent(ObjectContentInner::Bytes(SegmentedBytes::from(
ObjectContent { Bytes::from(value),
r: Box::pin(tokio_stream::iter(vec![Ok(Bytes::from(value))])), )))
extra: None, }
size: Some(n as u64), }
}
impl From<&Path> for ObjectContent {
fn from(value: &Path) -> Self {
ObjectContent(ObjectContentInner::FilePath(value.to_path_buf()))
}
}
impl Default for ObjectContent {
fn default() -> Self {
ObjectContent(ObjectContentInner::Bytes(SegmentedBytes::new()))
} }
} }
impl ObjectContent { impl ObjectContent {
/// Create a new `ObjectContent` from a stream of `Bytes`.
pub fn new_from_stream(
r: impl Stream<Item = IoResult<Bytes>> + 'static,
size: Option<u64>,
) -> Self {
let r = Box::pin(r);
ObjectContent(ObjectContentInner::Stream(r, size))
}
pub async fn to_stream(
self,
) -> IoResult<(Pin<Box<dyn Stream<Item = IoResult<Bytes>>>>, Option<u64>)> {
match self.0 {
ObjectContentInner::Stream(r, size) => Ok((r, size)),
ObjectContentInner::FilePath(path) => {
let file = fs::File::open(&path).await?;
let size = file.metadata().await?.len();
let r = tokio_util::io::ReaderStream::new(file);
Ok((Box::pin(r), Some(size)))
}
ObjectContentInner::Bytes(sb) => {
let k = sb.len();
let r = Box::pin(tokio_stream::iter(sb.into_iter().map(Ok)));
Ok((r, Some(k as u64)))
}
}
}
pub(crate) async fn to_content_stream(self) -> IoResult<ContentStream> {
let (r, size) = self.to_stream().await?;
Ok(ContentStream::new(r, size))
}
/// Load the content into memory and return a `SegmentedBytes` object.
pub async fn to_segmented_bytes(self) -> IoResult<SegmentedBytes> {
let mut segmented_bytes = SegmentedBytes::new();
let (mut r, _) = self.to_stream().await?;
while let Some(bytes) = r.next().await {
let bytes = bytes?;
if bytes.is_empty() {
break;
}
segmented_bytes.append(bytes);
}
Ok(segmented_bytes)
}
/// Write the content to a file. This function will return the total number
/// of bytes written to the file. It first writes the content to a temporary
/// file and then renames the temporary file to the final file path. The
/// temporary file will be located in the same directory as the final file
/// path.
///
/// If the file already exists, it will be replaced. If the parent directory
/// does not exist, an attempt to create it will be made.
pub async fn to_file(self, file_path: &Path) -> IoResult<u64> {
if file_path.is_dir() {
return Err(std::io::Error::other("path is a directory"));
}
let parent_dir = file_path.parent().ok_or(std::io::Error::other(format!(
"path {:?} does not have a parent directory",
file_path
)))?;
if !parent_dir.is_dir() {
fs::create_dir_all(parent_dir).await?;
}
let file_name = file_path.file_name().ok_or(std::io::Error::other(
"could not get filename component of path",
))?;
let mut tmp_file_name: OsString = file_name.to_os_string();
tmp_file_name.push(format!("_{}", random::<u64>()));
let tmp_file_path = parent_dir
.to_path_buf()
.join(Path::new(tmp_file_name.as_os_str()));
let mut total = 0;
{
let mut fp = fs::File::open(&tmp_file_path).await?;
let (mut r, _) = self.to_stream().await?;
while let Some(bytes) = r.next().await {
let bytes = bytes?;
if bytes.is_empty() {
break;
}
total += bytes.len() as u64;
fp.write_all(&bytes).await?;
}
fp.flush().await?;
}
fs::rename(&tmp_file_path, file_path).await?;
Ok(total)
}
}
pub(crate) struct ContentStream {
r: Pin<Box<dyn Stream<Item = IoResult<Bytes>>>>,
extra: Option<Bytes>,
size: Option<u64>,
}
impl ContentStream {
pub fn new(r: impl Stream<Item = IoResult<Bytes>> + 'static, size: Option<u64>) -> Self { pub fn new(r: impl Stream<Item = IoResult<Bytes>> + 'static, size: Option<u64>) -> Self {
let r = Box::pin(r); let r = Box::pin(r);
Self { Self {
@ -79,24 +188,10 @@ impl ObjectContent {
} }
} }
pub fn from_reader(r: impl AsyncRead + Send + Sync + 'static, size: Option<u64>) -> Self {
let pinned = Box::pin(r);
let r = tokio_util::io::ReaderStream::new(pinned);
Self {
r: Box::pin(r),
extra: None,
size,
}
}
pub fn get_size(&self) -> Option<u64> { pub fn get_size(&self) -> Option<u64> {
self.size self.size
} }
pub fn stream(self) -> impl Stream<Item = IoResult<Bytes>> {
self.r
}
// Read as many bytes as possible up to `n` and return a `SegmentedBytes` // Read as many bytes as possible up to `n` and return a `SegmentedBytes`
// object. // object.
pub async fn read_upto(&mut self, n: usize) -> IoResult<SegmentedBytes> { pub async fn read_upto(&mut self, n: usize) -> IoResult<SegmentedBytes> {
@ -134,20 +229,9 @@ impl ObjectContent {
} }
Ok(segmented_bytes) Ok(segmented_bytes)
} }
pub async fn to_segmented_bytes(mut self) -> IoResult<SegmentedBytes> {
let mut segmented_bytes = SegmentedBytes::new();
while let Some(bytes) = self.r.next().await {
let bytes = bytes?;
if bytes.is_empty() {
break;
}
segmented_bytes.append(bytes);
}
Ok(segmented_bytes)
}
} }
/// An aggregated collection of `Bytes` objects.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct SegmentedBytes { pub struct SegmentedBytes {
segments: Vec<Vec<Bytes>>, segments: Vec<Vec<Bytes>>,
@ -200,6 +284,14 @@ impl SegmentedBytes {
} }
} }
pub fn into_iter(self) -> SegmentedBytesIntoIterator {
SegmentedBytesIntoIterator {
sb: self,
current_segment: 0,
current_segment_index: 0,
}
}
// Copy all the content into a single `Bytes` object. // Copy all the content into a single `Bytes` object.
pub fn to_bytes(&self) -> Bytes { pub fn to_bytes(&self) -> Bytes {
let mut buf = BytesMut::with_capacity(self.total_size); let mut buf = BytesMut::with_capacity(self.total_size);
@ -212,21 +304,13 @@ impl SegmentedBytes {
} }
} }
impl From<Bytes> for SegmentedBytes { pub struct SegmentedBytesIntoIterator {
fn from(bytes: Bytes) -> Self { sb: SegmentedBytes,
let mut sb = SegmentedBytes::new();
sb.append(bytes);
sb
}
}
pub struct SegmentedBytesIterator<'a> {
sb: &'a SegmentedBytes,
current_segment: usize, current_segment: usize,
current_segment_index: usize, current_segment_index: usize,
} }
impl Iterator for SegmentedBytesIterator<'_> { impl Iterator for SegmentedBytesIntoIterator {
type Item = Bytes; type Item = Bytes;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
@ -237,10 +321,70 @@ impl Iterator for SegmentedBytesIterator<'_> {
if self.current_segment_index >= segment.len() { if self.current_segment_index >= segment.len() {
self.current_segment += 1; self.current_segment += 1;
self.current_segment_index = 0; self.current_segment_index = 0;
return self.next(); return Iterator::next(self);
} }
let bytes = &segment[self.current_segment_index]; let bytes = &segment[self.current_segment_index];
self.current_segment_index += 1; self.current_segment_index += 1;
Some(bytes.clone()) Some(bytes.clone())
} }
} }
impl IntoIterator for SegmentedBytes {
type Item = Bytes;
type IntoIter = SegmentedBytesIntoIterator;
fn into_iter(self) -> Self::IntoIter {
SegmentedBytesIntoIterator {
sb: self,
current_segment: 0,
current_segment_index: 0,
}
}
}
pub struct SegmentedBytesIterator<'a> {
sb: &'a SegmentedBytes,
current_segment: usize,
current_segment_index: usize,
}
impl<'a> Iterator for SegmentedBytesIterator<'a> {
type Item = Bytes;
fn next(&mut self) -> Option<Self::Item> {
if self.current_segment >= self.sb.segments.len() {
return None;
}
let segment = &self.sb.segments[self.current_segment];
if self.current_segment_index >= segment.len() {
self.current_segment += 1;
self.current_segment_index = 0;
return Iterator::next(self);
}
let bytes = &segment[self.current_segment_index];
self.current_segment_index += 1;
Some(bytes.clone())
}
}
impl<'a> IntoIterator for &'a SegmentedBytes {
type Item = Bytes;
type IntoIter = SegmentedBytesIterator<'a>;
fn into_iter(self) -> Self::IntoIter {
SegmentedBytesIterator {
sb: self,
current_segment: 0,
current_segment_index: 0,
}
}
}
impl From<Bytes> for SegmentedBytes {
fn from(bytes: Bytes) -> Self {
let mut sb = SegmentedBytes::new();
sb.append(bytes);
sb
}
}

View File

@ -13,17 +13,13 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::{ use std::{collections::HashMap, sync::Arc};
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use bytes::BytesMut; use bytes::BytesMut;
use http::Method; use http::Method;
use crate::s3::{ use crate::s3::{
builders::ObjectContent, builders::ContentStream,
client::Client, client::Client,
error::Error, error::Error,
response::{ response::{
@ -35,7 +31,7 @@ use crate::s3::{
utils::{check_bucket_name, md5sum_hash, merge, to_iso8601utc, urlencode, Multimap}, utils::{check_bucket_name, md5sum_hash, merge, to_iso8601utc, urlencode, Multimap},
}; };
use super::SegmentedBytes; use super::{ObjectContent, SegmentedBytes};
/// Argument for /// Argument for
/// [create_multipart_upload()](crate::s3::client::Client::create_multipart_upload) /// [create_multipart_upload()](crate::s3::client::Client::create_multipart_upload)
@ -632,12 +628,11 @@ pub struct PutObjectContent {
content_type: String, content_type: String,
// source data // source data
input_reader: Option<ObjectContent>, input_content: ObjectContent,
file_path: Option<PathBuf>,
// Computed. // Computed.
// expected_parts: Option<u16>, // expected_parts: Option<u16>,
reader: ObjectContent, content_stream: ContentStream,
part_count: u16, part_count: u16,
} }
@ -646,8 +641,7 @@ impl PutObjectContent {
PutObjectContent { PutObjectContent {
bucket: bucket.to_string(), bucket: bucket.to_string(),
object: object.to_string(), object: object.to_string(),
input_reader: Some(content.into()), input_content: content.into(),
file_path: None,
client: None, client: None,
extra_headers: None, extra_headers: None,
extra_query_params: None, extra_query_params: None,
@ -659,29 +653,7 @@ impl PutObjectContent {
legal_hold: false, legal_hold: false,
part_size: None, part_size: None,
content_type: String::from("application/octet-stream"), content_type: String::from("application/octet-stream"),
reader: ObjectContent::empty(), content_stream: ContentStream::empty(),
part_count: 0,
}
}
pub fn from_file(bucket: &str, object: &str, file_path: &Path) -> Self {
PutObjectContent {
bucket: bucket.to_string(),
object: object.to_string(),
input_reader: None,
file_path: Some(file_path.to_path_buf()),
client: None,
extra_headers: None,
extra_query_params: None,
region: None,
user_metadata: None,
sse: None,
tags: None,
retention: None,
legal_hold: false,
part_size: None,
content_type: String::from("application/octet-stream"),
reader: ObjectContent::empty(),
part_count: 0, part_count: 0,
} }
} }
@ -750,18 +722,13 @@ impl PutObjectContent {
))); )));
} }
if self.input_reader.is_none() { let input_content = std::mem::replace(&mut self.input_content, ObjectContent::default());
// This unwrap is safe as the public API ensures that the file_path self.content_stream = input_content
// or the reader is always set. .to_content_stream()
let file_path = self.file_path.as_ref().unwrap(); .await
let file = tokio::fs::File::open(file_path).await?; .map_err(|e| Error::IOError(e))?;
let size = file.metadata().await?.len();
self.reader = ObjectContent::from_reader(file, Some(size));
} else {
self.reader = self.input_reader.take().unwrap();
}
let object_size = self.reader.get_size(); let object_size = self.content_stream.get_size();
let (psize, expected_parts) = calc_part_info(object_size, self.part_size)?; let (psize, expected_parts) = calc_part_info(object_size, self.part_size)?;
assert_ne!(expected_parts, Some(0)); assert_ne!(expected_parts, Some(0));
self.part_size = Some(psize); self.part_size = Some(psize);
@ -775,7 +742,7 @@ impl PutObjectContent {
} }
// Read the first part. // Read the first part.
let seg_bytes = self.reader.read_upto(psize as usize).await?; let seg_bytes = self.content_stream.read_upto(psize as usize).await?;
// In the first part read, if: // In the first part read, if:
// //
@ -839,7 +806,7 @@ impl PutObjectContent {
if let Some(v) = first_part.take() { if let Some(v) = first_part.take() {
v v
} else { } else {
self.reader.read_upto(psize as usize).await? self.content_stream.read_upto(psize as usize).await?
} }
}; };
part_number += 1; part_number += 1;

View File

@ -15,8 +15,6 @@
//! S3 APIs for uploading objects. //! S3 APIs for uploading objects.
use std::path::Path;
use super::Client; use super::Client;
use crate::s3::{ use crate::s3::{
builders::{ builders::{
@ -76,13 +74,4 @@ impl Client {
) -> PutObjectContent { ) -> PutObjectContent {
PutObjectContent::new(bucket, object, content).client(self) PutObjectContent::new(bucket, object, content).client(self)
} }
pub fn put_object_from_file(
&self,
bucket: &str,
object: &str,
file_path: &Path,
) -> PutObjectContent {
PutObjectContent::from_file(bucket, object, file_path).client(self)
}
} }

View File

@ -26,3 +26,5 @@ pub mod signer;
pub mod sse; pub mod sse;
pub mod types; pub mod types;
pub mod utils; pub mod utils;
pub use client::{Client, ClientBuilder};

View File

@ -50,7 +50,7 @@ impl FromS3Response for GetObjectResponse2 {
result.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) result.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
}); });
let content = ObjectContent::new(body, Some(content_length)); let content = ObjectContent::new_from_stream(body, Some(content_length));
Ok(GetObjectResponse2 { Ok(GetObjectResponse2 {
headers: header_map, headers: header_map,

View File

@ -322,7 +322,7 @@ impl ClientTest {
.put_object_content( .put_object_content(
&self.test_bucket, &self.test_bucket,
&object_name, &object_name,
ObjectContent::new(data_src, Some(*size)), ObjectContent::new_from_stream(data_src, Some(*size)),
) )
.send() .send()
.await .await