moved Tokio runtime from general dependency to dev dependency (#167)

* moved Tokio runtime from general dependency to dev dependency
* reduced number of worker threads in tests
This commit is contained in:
Henk-Jan Lebbink 2025-06-18 11:26:29 +02:00 committed by GitHub
parent 1af3f72c12
commit 6f904b452a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
54 changed files with 305 additions and 298 deletions

View File

@ -24,6 +24,8 @@ ring = ["dep:ring"]
[dependencies]
async-recursion = "1.1.1"
async-std = { version = "1.13.1", features = ["attributes"] }
async-stream = "0.3.6"
async-trait = "0.1.88"
base64 = "0.22.1"
byteorder = "1.5.0"
@ -48,18 +50,16 @@ ring = { version = "0.17.14", optional = true, default-features = false, feature
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
sha2 = { version = "0.10.8", optional = true }
tokio = { version = "1.45.1", features = ["full"] }
tokio-stream = "0.1.17"
tokio-util = { version = "0.7.15", features = ["io"] }
urlencoding = "2.1.3"
xmltree = "0.11.0"
futures = "0.3.31"
http = "1.3.1"
[dev-dependencies]
tokio = { version = "1.45.1", features = ["full"] }
minio_common = { path = "./common" }
async-std = { version = "1.13.1", features = ["attributes", "tokio1"] }
clap = { version = "4.5.39", features = ["derive"] }
clap = { version = "4.5.40", features = ["derive"] }
quickcheck = "1.0.3"
criterion = "0.6.0"

View File

@ -20,8 +20,8 @@ use minio::s3::builders::{DeleteBucketTagging, GetBucketTagging, PutBucketTaggin
use minio::s3::types::S3Api;
use minio_common::example::create_tags_example;
pub(crate) fn bench_put_bucket_tagging(criterion: &mut Criterion) {
if skip_express_mode("bench_put_bucket_tagging") {
pub(crate) async fn bench_put_bucket_tagging(criterion: &mut Criterion) {
if skip_express_mode("bench_put_bucket_tagging").await {
return;
}
benchmark_s3_api(
@ -34,8 +34,8 @@ pub(crate) fn bench_put_bucket_tagging(criterion: &mut Criterion) {
},
)
}
pub(crate) fn bench_get_bucket_tagging(criterion: &mut Criterion) {
if skip_express_mode("bench_get_bucket_tagging") {
pub(crate) async fn bench_get_bucket_tagging(criterion: &mut Criterion) {
if skip_express_mode("bench_get_bucket_tagging").await {
return;
}
benchmark_s3_api(
@ -54,8 +54,8 @@ pub(crate) fn bench_get_bucket_tagging(criterion: &mut Criterion) {
|ctx| GetBucketTagging::new(ctx.client.clone(), ctx.bucket.clone()),
)
}
pub(crate) fn bench_delete_bucket_tagging(criterion: &mut Criterion) {
if skip_express_mode("bench_delete_bucket_tagging") {
pub(crate) async fn bench_delete_bucket_tagging(criterion: &mut Criterion) {
if skip_express_mode("bench_delete_bucket_tagging").await {
return;
}
benchmark_s3_api(

View File

@ -18,8 +18,8 @@ use crate::common_benches::{Ctx2, benchmark_s3_api, skip_express_mode};
use criterion::Criterion;
use minio::s3::builders::{GetBucketVersioning, PutBucketVersioning, VersioningStatus};
pub(crate) fn bench_get_bucket_versioning(criterion: &mut Criterion) {
if skip_express_mode("bench_get_bucket_versioning") {
pub(crate) async fn bench_get_bucket_versioning(criterion: &mut Criterion) {
if skip_express_mode("bench_get_bucket_versioning").await {
return;
}
benchmark_s3_api(
@ -29,8 +29,8 @@ pub(crate) fn bench_get_bucket_versioning(criterion: &mut Criterion) {
|ctx| GetBucketVersioning::new(ctx.client.clone(), ctx.bucket.clone()),
)
}
pub(crate) fn bench_put_bucket_versioning(criterion: &mut Criterion) {
if skip_express_mode("bench_put_bucket_versioning") {
pub(crate) async fn bench_put_bucket_versioning(criterion: &mut Criterion) {
if skip_express_mode("bench_put_bucket_versioning").await {
return;
}
benchmark_s3_api(

View File

@ -24,8 +24,8 @@ use minio_common::test_context::TestContext;
use tokio::task;
#[allow(dead_code)]
pub(crate) fn bench_object_append(criterion: &mut Criterion) {
if !TestContext::new_from_env().client.is_minio_express() {
pub(crate) async fn bench_object_append(criterion: &mut Criterion) {
if !TestContext::new_from_env().client.is_minio_express().await {
println!("Skipping benchmark because it is NOT running in MinIO Express mode");
return;
}

View File

@ -19,8 +19,8 @@ use criterion::Criterion;
use minio::s3::builders::{GetObjectLegalHold, PutObjectLegalHold};
use minio::s3::types::S3Api;
pub(crate) fn bench_put_object_legal_hold(criterion: &mut Criterion) {
if skip_express_mode("bench_put_object_legal_hold") {
pub(crate) async fn bench_put_object_legal_hold(criterion: &mut Criterion) {
if skip_express_mode("bench_put_object_legal_hold").await {
return;
}
benchmark_s3_api(
@ -33,8 +33,8 @@ pub(crate) fn bench_put_object_legal_hold(criterion: &mut Criterion) {
},
)
}
pub(crate) fn bench_get_object_legal_hold(criterion: &mut Criterion) {
if skip_express_mode("bench_get_object_legal_hold") {
pub(crate) async fn bench_get_object_legal_hold(criterion: &mut Criterion) {
if skip_express_mode("bench_get_object_legal_hold").await {
return;
}
benchmark_s3_api(

View File

@ -18,8 +18,8 @@ use criterion::Criterion;
use minio::s3::builders::{DeleteObjectLockConfig, GetObjectLockConfig, PutObjectLockConfig};
use minio_common::example::create_object_lock_config_example;
pub(crate) fn bench_put_object_lock_config(criterion: &mut Criterion) {
if skip_express_mode("bench_put_object_lock_config") {
pub(crate) async fn bench_put_object_lock_config(criterion: &mut Criterion) {
if skip_express_mode("bench_put_object_lock_config").await {
return;
}
benchmark_s3_api(
@ -32,8 +32,8 @@ pub(crate) fn bench_put_object_lock_config(criterion: &mut Criterion) {
},
)
}
pub(crate) fn bench_get_object_lock_config(criterion: &mut Criterion) {
if skip_express_mode("bench_get_object_lock_config") {
pub(crate) async fn bench_get_object_lock_config(criterion: &mut Criterion) {
if skip_express_mode("bench_get_object_lock_config").await {
return;
}
benchmark_s3_api(
@ -43,8 +43,8 @@ pub(crate) fn bench_get_object_lock_config(criterion: &mut Criterion) {
|ctx| GetObjectLockConfig::new(ctx.client.clone(), ctx.bucket.clone()),
)
}
pub(crate) fn bench_delete_object_lock_config(criterion: &mut Criterion) {
if skip_express_mode("bench_delete_object_lock_config") {
pub(crate) async fn bench_delete_object_lock_config(criterion: &mut Criterion) {
if skip_express_mode("bench_delete_object_lock_config").await {
return;
}
benchmark_s3_api(

View File

@ -21,8 +21,8 @@ use minio::s3::response::PutObjectRetentionResponse;
use minio::s3::types::{RetentionMode, S3Api};
use minio::s3::utils::utc_now;
pub(crate) fn bench_put_object_retention(criterion: &mut Criterion) {
if skip_express_mode("bench_put_object_retention") {
pub(crate) async fn bench_put_object_retention(criterion: &mut Criterion) {
if skip_express_mode("bench_put_object_retention").await {
return;
}
benchmark_s3_api(
@ -36,8 +36,8 @@ pub(crate) fn bench_put_object_retention(criterion: &mut Criterion) {
},
)
}
pub(crate) fn bench_get_object_retention(criterion: &mut Criterion) {
if skip_express_mode("bench_get_object_retention") {
pub(crate) async fn bench_get_object_retention(criterion: &mut Criterion) {
if skip_express_mode("bench_get_object_retention").await {
return;
}
benchmark_s3_api(

View File

@ -21,8 +21,8 @@ use minio::s3::response::PutObjectTaggingResponse;
use minio::s3::types::S3Api;
use minio_common::example::create_tags_example;
pub(crate) fn bench_put_object_tagging(criterion: &mut Criterion) {
if skip_express_mode("bench_put_object_tagging") {
pub(crate) async fn bench_put_object_tagging(criterion: &mut Criterion) {
if skip_express_mode("bench_put_object_tagging").await {
return;
}
benchmark_s3_api(
@ -35,8 +35,8 @@ pub(crate) fn bench_put_object_tagging(criterion: &mut Criterion) {
},
)
}
pub(crate) fn bench_get_object_tagging(criterion: &mut Criterion) {
if skip_express_mode("bench_get_object_tagging") {
pub(crate) async fn bench_get_object_tagging(criterion: &mut Criterion) {
if skip_express_mode("bench_get_object_tagging").await {
return;
}
benchmark_s3_api(

View File

@ -167,8 +167,8 @@ pub(crate) fn benchmark_s3_api<ApiType, GlobalSetupFuture>(
group.finish();
}
pub(crate) fn skip_express_mode(bench_name: &str) -> bool {
let skip = TestContext::new_from_env().client.is_minio_express();
pub(crate) async fn skip_express_mode(bench_name: &str) -> bool {
let skip = TestContext::new_from_env().client.is_minio_express().await;
if skip {
println!("Skipping benchmark '{}' (MinIO Express mode)", bench_name);
}

View File

@ -5,15 +5,15 @@ edition = "2024"
[dependencies]
minio = {path = ".." }
tokio = { version = "1.44.1", features = ["full"] }
tokio-stream = "0.1.17"
tokio = { version = "1.45.1", features = ["full"] }
async-std = "1.13.1"
rand = { version = "0.8.5", features = ["small_rng"] }
bytes = "1.10.1"
log = "0.4.27"
chrono = "0.4.40"
reqwest = "0.12.15"
chrono = "0.4.41"
reqwest = "0.12.20"
http = "1.3.1"
futures = "0.3.31"
[lib]
name = "minio_common"

View File

@ -13,13 +13,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use async_std::stream::Stream;
use async_std::task;
use bytes::Bytes;
use rand::SeedableRng;
use futures::io::AsyncRead;
use rand::prelude::SmallRng;
use rand::{RngCore, SeedableRng};
use std::io;
use tokio::io::AsyncRead;
use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct RandSrc {
size: u64,
@ -64,26 +66,21 @@ impl Stream for RandSrc {
impl AsyncRead for RandSrc {
fn poll_read(
self: std::pin::Pin<&mut Self>,
_cx: &mut task::Context<'_>,
read_buf: &mut tokio::io::ReadBuf<'_>,
) -> task::Poll<io::Result<()>> {
let buf = read_buf.initialize_unfilled();
let bytes_read = match self.size > (buf.len() as u64) {
true => buf.len(),
false => self.size as usize,
};
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let this = self.as_mut().get_mut();
let this = self.get_mut();
if bytes_read > 0 {
let random: &mut dyn rand::RngCore = &mut this.rng;
random.fill_bytes(&mut buf[0..bytes_read]);
if this.size == 0 {
return Poll::Ready(Ok(0)); // EOF
}
this.size -= bytes_read as u64;
let to_read = std::cmp::min(this.size as usize, buf.len());
read_buf.advance(bytes_read);
task::Poll::Ready(Ok(()))
this.rng.fill_bytes(&mut buf[..to_read]);
this.size -= to_read as u64;
Poll::Ready(Ok(to_read))
}
}

View File

@ -28,7 +28,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
env_logger::init(); // Note: set environment variable RUST_LOG="INFO" to log info and higher
let client: Client = create_client_on_localhost()?;
if !client.is_minio_express() {
if !client.is_minio_express().await {
println!("Need (MinIO) Express mode to run this example");
return Ok(());
}

View File

@ -15,14 +15,6 @@
//! Builders for RemoveObject APIs.
use async_trait::async_trait;
use bytes::Bytes;
use futures_util::{Stream, StreamExt, stream as futures_stream};
use http::Method;
use std::pin::Pin;
use tokio_stream::iter as stream_iter;
use crate::s3::client::MAX_MULTIPART_COUNT;
use crate::s3::multimap::{Multimap, MultimapExt};
use crate::s3::response::DeleteError;
@ -35,6 +27,12 @@ use crate::s3::{
types::{S3Api, S3Request, ToS3Request, ToStream},
utils::{check_bucket_name, md5sum_hash},
};
use async_trait::async_trait;
use bytes::Bytes;
use futures_util::stream::iter;
use futures_util::{Stream, StreamExt, stream as futures_stream};
use http::Method;
use std::pin::Pin;
// region: object-to-delete
@ -260,7 +258,7 @@ impl ToS3Request for DeleteObjects {
data.push_str("</Object>");
}
data.push_str("</Delete>");
let data: Bytes = data.into();
let bytes: Bytes = data.into();
let mut headers: Multimap = self.extra_headers.unwrap_or_default();
{
@ -268,7 +266,7 @@ impl ToS3Request for DeleteObjects {
headers.add("x-amz-bypass-governance-retention", "true");
}
headers.add("Content-Type", "application/xml");
headers.add("Content-MD5", md5sum_hash(data.as_ref()));
headers.add("Content-MD5", md5sum_hash(bytes.as_ref()));
}
Ok(S3Request::new(self.client, Method::POST)
@ -276,7 +274,7 @@ impl ToS3Request for DeleteObjects {
.bucket(Some(self.bucket))
.query_params(insert(self.extra_query_params, "delete"))
.headers(headers)
.body(Some(data.into())))
.body(Some(bytes.into())))
}
}
@ -296,7 +294,7 @@ impl ObjectsStream {
impl From<ObjectToDelete> for ObjectsStream {
fn from(delete_object: ObjectToDelete) -> Self {
Self::from_stream(stream_iter(std::iter::once(delete_object)))
Self::from_stream(iter(std::iter::once(delete_object)))
}
}
@ -305,7 +303,7 @@ where
I: Iterator<Item = ObjectToDelete> + Send + Sync + 'static,
{
fn from(keys: I) -> Self {
Self::from_stream(stream_iter(keys))
Self::from_stream(iter(keys))
}
}

View File

@ -72,7 +72,10 @@ impl GetPresignedObjectUrl {
check_bucket_name(&self.bucket, true)?;
check_object_name(&self.object)?;
let region: String = self.client.get_region_cached(&self.bucket, &self.region)?;
let region: String = self
.client
.get_region_cached(&self.bucket, &self.region)
.await?;
let mut query_params: Multimap = self.extra_query_params.unwrap_or_default();
query_params.add_version(self.version_id.clone());

View File

@ -35,10 +35,10 @@ impl GetPresignedPolicyFormData {
}
pub async fn send(self) -> Result<HashMap<String, String>, Error> {
// NOTE: this send function is async to be comparable with other functions...
let region: String = self
.client
.get_region_cached(&self.policy.bucket, &self.policy.region)?;
.get_region_cached(&self.policy.bucket, &self.policy.region)
.await?;
let creds: Credentials = self.client.shared.provider.as_ref().unwrap().fetch();
self.policy.form_data(

View File

@ -18,7 +18,6 @@ use crate::s3::error::Error;
use crate::s3::lifecycle_config::LifecycleConfig;
use crate::s3::multimap::{Multimap, MultimapExt};
use crate::s3::response::PutBucketLifecycleResponse;
use crate::s3::segmented_bytes::SegmentedBytes;
use crate::s3::types::{S3Api, S3Request, ToS3Request};
use crate::s3::utils::{check_bucket_name, insert, md5sum_hash};
use bytes::Bytes;
@ -81,14 +80,13 @@ impl ToS3Request for PutBucketLifecycle {
let mut headers: Multimap = self.extra_headers.unwrap_or_default();
let bytes: Bytes = self.config.to_xml().into();
headers.add("Content-MD5", md5sum_hash(&bytes));
let body: Option<SegmentedBytes> = Some(SegmentedBytes::from(bytes));
headers.add("Content-MD5", md5sum_hash(bytes.as_ref()));
Ok(S3Request::new(self.client, Method::PUT)
.region(self.region)
.bucket(Some(self.bucket))
.query_params(insert(self.extra_query_params, "lifecycle"))
.headers(headers)
.body(body))
.body(Some(bytes.into())))
}
}

View File

@ -283,9 +283,9 @@ impl ToS3Request for CompleteMultipartUpload {
}
}
// Set capacity of the byte-buffer based on the part count - attempting
// Set the capacity of the byte-buffer based on the part count - attempting
// to avoid extra allocations when building the XML payload.
let data: Bytes = {
let bytes: Bytes = {
let mut data = BytesMut::with_capacity(100 * self.parts.len() + 100);
data.extend_from_slice(b"<CompleteMultipartUpload>");
for part in self.parts.iter() {
@ -302,7 +302,7 @@ impl ToS3Request for CompleteMultipartUpload {
let mut headers: Multimap = self.extra_headers.unwrap_or_default();
{
headers.add("Content-Type", "application/xml");
headers.add("Content-MD5", md5sum_hash(data.as_ref()));
headers.add("Content-MD5", md5sum_hash(bytes.as_ref()));
}
let mut query_params: Multimap = self.extra_query_params.unwrap_or_default();
query_params.add("uploadId", self.upload_id);
@ -313,7 +313,7 @@ impl ToS3Request for CompleteMultipartUpload {
.object(Some(self.object))
.query_params(query_params)
.headers(headers)
.body(Some(data.into())))
.body(Some(bytes.into())))
}
}
// endregion: complete-multipart-upload

View File

@ -17,7 +17,6 @@ use crate::s3::Client;
use crate::s3::error::Error;
use crate::s3::multimap::{Multimap, MultimapExt};
use crate::s3::response::PutObjectLegalHoldResponse;
use crate::s3::segmented_bytes::SegmentedBytes;
use crate::s3::types::{S3Api, S3Request, ToS3Request};
use crate::s3::utils::{check_bucket_name, check_object_name, insert, md5sum_hash};
use bytes::Bytes;
@ -88,10 +87,10 @@ impl ToS3Request for PutObjectLegalHold {
Some(true) => "<LegalHold><Status>ON</Status></LegalHold>",
_ => "<LegalHold><Status>OFF</Status></LegalHold>",
};
let bytes: Bytes = Bytes::from(payload);
// TODO consider const payload with precalculated md5
headers.add("Content-MD5", md5sum_hash(payload.as_ref()));
let body: Option<SegmentedBytes> = Some(SegmentedBytes::from(Bytes::from(payload)));
headers.add("Content-MD5", md5sum_hash(bytes.as_ref()));
Ok(S3Request::new(self.client, Method::PUT)
.region(self.region)
@ -99,6 +98,6 @@ impl ToS3Request for PutObjectLegalHold {
.query_params(query_params)
.headers(headers)
.object(Some(self.object))
.body(body))
.body(Some(bytes.into())))
}
}

View File

@ -17,7 +17,6 @@ use crate::s3::Client;
use crate::s3::error::Error;
use crate::s3::multimap::{Multimap, MultimapExt};
use crate::s3::response::PutObjectRetentionResponse;
use crate::s3::segmented_bytes::SegmentedBytes;
use crate::s3::types::{RetentionMode, S3Api, S3Request, ToS3Request};
use crate::s3::utils::{
UtcTime, check_bucket_name, check_object_name, insert, md5sum_hash, to_iso8601utc,
@ -108,7 +107,7 @@ impl ToS3Request for PutObjectRetention {
}
}
let data: String = {
let bytes: Bytes = {
let mut data: String = "<Retention>".into();
if let Some(v) = &self.retention_mode {
data.push_str("<Mode>");
@ -121,14 +120,14 @@ impl ToS3Request for PutObjectRetention {
data.push_str("</RetainUntilDate>");
}
data.push_str("</Retention>");
data
Bytes::from(data)
};
let mut headers: Multimap = self.extra_headers.unwrap_or_default();
if self.bypass_governance_mode {
headers.add("x-amz-bypass-governance-retention", "true");
}
headers.add("Content-MD5", md5sum_hash(data.as_ref()));
headers.add("Content-MD5", md5sum_hash(bytes.as_ref()));
let mut query_params: Multimap = insert(self.extra_query_params, "retention");
query_params.add_version(self.version_id);
@ -139,6 +138,6 @@ impl ToS3Request for PutObjectRetention {
.query_params(query_params)
.headers(headers)
.object(Some(self.object))
.body(Some(SegmentedBytes::from(Bytes::from(data)))))
.body(Some(bytes.into())))
}
}

View File

@ -17,7 +17,6 @@ use crate::s3::Client;
use crate::s3::error::Error;
use crate::s3::multimap::{Multimap, MultimapExt};
use crate::s3::response::SelectObjectContentResponse;
use crate::s3::segmented_bytes::SegmentedBytes;
use crate::s3::sse::SseCustomerKey;
use crate::s3::types::{S3Api, S3Request, SelectRequest, ToS3Request};
use crate::s3::utils::{check_bucket_name, check_object_name, insert, md5sum_hash};
@ -100,9 +99,7 @@ impl ToS3Request for SelectObjectContent {
return Err(Error::SseTlsRequired(None));
}
}
let region: String = self.client.get_region_cached(&self.bucket, &self.region)?;
let data = self.request.to_xml();
let bytes: Bytes = data.into();
let bytes: Bytes = self.request.to_xml().into();
let mut headers: Multimap = self.extra_headers.unwrap_or_default();
headers.add("Content-MD5", md5sum_hash(bytes.as_ref()));
@ -110,14 +107,12 @@ impl ToS3Request for SelectObjectContent {
let mut query_params: Multimap = insert(self.extra_query_params, "select");
query_params.add("select-type", "2");
let body: Option<SegmentedBytes> = Some(SegmentedBytes::from(bytes));
Ok(S3Request::new(self.client, Method::POST)
.region(Some(region))
.region(self.region)
.bucket(Some(self.bucket))
.query_params(query_params)
.object(Some(self.object))
.headers(headers)
.body(body))
.object(Some(self.object))
.body(Some(bytes.into())))
}
}

View File

@ -36,9 +36,7 @@ use dashmap::DashMap;
use http::HeaderMap;
use hyper::http::Method;
use rand::Rng;
use rand::distributions::Alphanumeric;
use reqwest::Body;
use tokio::task;
mod append_object;
mod bucket_exists;
@ -257,7 +255,7 @@ impl Client {
.build()
}
/// Returns whether is client uses an AWS host.
/// Returns whether this client uses an AWS host.
pub fn is_aws_host(&self) -> bool {
self.shared.base_url.is_aws_host()
}
@ -268,44 +266,38 @@ impl Client {
}
/// Returns whether this client is configured to use the express endpoint and is minio enterprise.
pub fn is_minio_express(&self) -> bool {
if self.shared.express.get().is_some() {
*self.shared.express.get().unwrap()
pub async fn is_minio_express(&self) -> bool {
if let Some(val) = self.shared.express.get() {
*val
} else {
task::block_in_place(|| match tokio::runtime::Runtime::new() {
Ok(rt) => {
// create a random bucket name, and check if it exists,
// we are not interested in the result, just the headers
// which will contain the server type
// Create a random bucket name
let bucket_name: String = rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(20)
.map(char::from)
.collect::<String>()
.to_lowercase();
let bucket_name: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(20)
.map(char::from)
.collect::<String>()
.to_lowercase();
let express: bool = rt.block_on(async {
match BucketExists::new(self.clone(), bucket_name).send().await {
Ok(v) => {
if let Some(server) = v.headers.get("server") {
if let Ok(s) = server.to_str() {
return s
.eq_ignore_ascii_case("MinIO Enterprise/S3Express");
}
}
}
Err(e) => {
println!("is_express_internal: error: {e}\nassume false");
}
let express = match BucketExists::new(self.clone(), bucket_name).send().await {
Ok(v) => {
if let Some(server) = v.headers.get("server") {
if let Ok(s) = server.to_str() {
s.eq_ignore_ascii_case("MinIO Enterprise/S3Express")
} else {
false
}
} else {
false
});
self.shared.express.set(express).unwrap_or_default();
express
}
}
Err(_) => false,
})
Err(e) => {
log::warn!("is_express_internal: error: {e}, assume false");
false
}
};
self.shared.express.set(express).unwrap_or_default();
express
}
}

View File

@ -55,7 +55,7 @@ impl Client {
bucket: S,
) -> Result<DeleteBucketResponse, Error> {
let bucket: String = bucket.into();
if self.is_minio_express() {
if self.is_minio_express().await {
let mut stream = self.list_objects(&bucket).to_stream().await;
while let Some(items) = stream.next().await {

View File

@ -19,8 +19,6 @@ use crate::s3::error::Error;
use crate::s3::response::GetRegionResponse;
use crate::s3::types::S3Api;
use tokio::task;
impl Client {
/// Creates a [`GetRegion`] request builder.
///
@ -50,7 +48,7 @@ impl Client {
/// Retrieves the region for the specified bucket name from the cache.
/// If the region is not found in the cache, it is fetched via a call to S3 or MinIO
/// and then stored in the cache for future lookups.
pub async fn get_region_cached_async<S: Into<String>>(
pub async fn get_region_cached<S: Into<String>>(
&self,
bucket: S,
region: &Option<String>, // the region as provided by the S3Request
@ -98,17 +96,4 @@ impl Client {
.insert(bucket, resolved_region.clone());
Ok(resolved_region)
}
/// Retrieves the region for the specified bucket name from the cache.
/// If the region is not found in the cache, it is fetched via a call to S3 or MinIO
/// and then stored in the cache for future lookups.
pub fn get_region_cached(
&self,
bucket: &str,
region: &Option<String>,
) -> Result<String, Error> {
task::block_in_place(|| {
tokio::runtime::Runtime::new()?.block_on(self.get_region_cached_async(bucket, region))
})
}
}

View File

@ -88,7 +88,7 @@ impl ErrorCode {
/// Error response for S3 operations
pub struct ErrorResponse {
/// Headers as returned by the server.
pub headers: HeaderMap,
pub(crate) headers: HeaderMap,
pub code: ErrorCode,
pub message: String,
pub resource: String,
@ -129,11 +129,16 @@ pub enum Error {
StrError(reqwest::header::ToStrError),
IntError(std::num::ParseIntError),
BoolError(std::str::ParseBoolError),
Utf8Error(alloc::string::FromUtf8Error),
Utf8Error(Box<dyn std::error::Error + Send + Sync + 'static>),
/// Occurs when converting Vec<u8> to String (e.g. String::from_utf8)
//FromUtf8Error(alloc::string::FromUtf8Error),
/// Occurs when converting &[u8] to &str (e.g. std::str::from_utf8)
//Utf8Error(std::str::Utf8Error),
JsonError(serde_json::Error),
XmlError(String),
InvalidBucketName(String),
InvalidBaseUrl(String),
InvalidBucketName(String),
UrlBuildError(String),
RegionMismatch(String, String),
S3Error(ErrorResponse),
@ -198,6 +203,7 @@ impl fmt::Display for Error {
Error::IntError(e) => write!(f, "{e}"),
Error::BoolError(e) => write!(f, "{e}"),
Error::Utf8Error(e) => write!(f, "{e}"),
//Error::FromUtf8Error(e) => write!(f, "{e}"),
Error::JsonError(e) => write!(f, "{e}"),
Error::XmlError(m) => write!(f, "{m}"),
Error::InvalidBucketName(m) => write!(f, "{m}"),
@ -397,7 +403,13 @@ impl From<std::str::ParseBoolError> for Error {
impl From<alloc::string::FromUtf8Error> for Error {
fn from(err: alloc::string::FromUtf8Error) -> Self {
Error::Utf8Error(err)
Error::Utf8Error(err.into())
}
}
impl From<std::str::Utf8Error> for Error {
fn from(err: std::str::Utf8Error) -> Self {
Error::Utf8Error(err.into())
}
}

View File

@ -13,15 +13,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use std::{ffi::OsString, path::Path, pin::Pin};
use async_std::io::{ReadExt, WriteExt};
use bytes::Bytes;
use futures_util::Stream;
use futures::stream::{self, Stream, StreamExt};
use rand::prelude::random;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio_stream::StreamExt;
use std::path::PathBuf;
use std::{ffi::OsString, fs, path::Path, pin::Pin};
use crate::s3::segmented_bytes::SegmentedBytes;
#[cfg(test)]
@ -151,15 +148,30 @@ impl ObjectContent {
) -> IoResult<(Pin<Box<dyn Stream<Item = IoResult<Bytes>> + Send>>, Size)> {
match self.0 {
ObjectContentInner::Stream(r, size) => Ok((r, size)),
ObjectContentInner::FilePath(path) => {
let file = fs::File::open(&path).await?;
let size = file.metadata().await?.len();
let r = tokio_util::io::ReaderStream::new(file);
Ok((Box::pin(r), Some(size).into()))
let mut file = async_std::fs::File::open(&path).await?;
let metadata = file.metadata().await?;
let size = metadata.len();
// Define a stream that reads the file in chunks
let stream = async_stream::try_stream! {
let mut buf = vec![0u8; 8192];
loop {
let n = file.read(&mut buf).await?;
if n == 0 {
break;
}
yield Bytes::copy_from_slice(&buf[..n]);
}
};
Ok((Box::pin(stream), Some(size).into()))
}
ObjectContentInner::Bytes(sb) => {
let k = sb.len();
let r = Box::pin(tokio_stream::iter(sb.into_iter().map(Ok)));
let r = Box::pin(stream::iter(sb.into_iter().map(Ok)));
Ok((r, Some(k as u64).into()))
}
}
@ -203,7 +215,7 @@ impl ObjectContent {
))
})?;
if !parent_dir.is_dir() {
fs::create_dir_all(parent_dir).await?;
async_std::fs::create_dir_all(parent_dir).await?;
}
let file_name = file_path.file_name().ok_or(std::io::Error::other(
"could not get filename component of path",
@ -215,7 +227,7 @@ impl ObjectContent {
.join(Path::new(tmp_file_name.as_os_str()));
let mut total_bytes_written = 0;
let mut fp = fs::OpenOptions::new()
let mut fp = async_std::fs::OpenOptions::new()
.write(true)
.create(true) // Ensures that the file will be created if it does not already exist
.truncate(true) // Clears the contents (truncates the file size to 0) before writing
@ -231,7 +243,7 @@ impl ObjectContent {
fp.write_all(&bytes).await?;
}
fp.flush().await?;
fs::rename(&tmp_file_path, file_path).await?;
fs::rename(&tmp_file_path, file_path)?;
Ok(total_bytes_written)
}
}
@ -263,7 +275,7 @@ impl ContentStream {
pub fn empty() -> Self {
Self {
r: Box::pin(tokio_stream::iter(vec![])),
r: Box::pin(stream::iter(vec![])),
extra: None,
size: Some(0).into(),
}

View File

@ -13,17 +13,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use futures_util::{Stream, TryStreamExt, stream};
use crate::s3::error::Error;
use crate::s3::types::{FromS3Response, NotificationRecords, S3Request};
use crate::s3::utils::take_bucket;
use futures_util::{Stream, StreamExt, TryStreamExt};
use http::HeaderMap;
use std::mem;
use tokio::io::AsyncBufReadExt;
use tokio_util::io::StreamReader;
use crate::s3::utils::take_bucket;
use crate::s3::{
error::Error,
types::{FromS3Response, NotificationRecords, S3Request},
};
/// Response of
/// [listen _bucket_notification()](crate::s3::client::Client::listen_bucket_notification)
@ -54,31 +49,48 @@ impl FromS3Response
let mut resp = resp?;
let headers: HeaderMap = mem::take(resp.headers_mut());
let stream_reader = StreamReader::new(resp.bytes_stream().map_err(std::io::Error::other));
// A simple stateful decoder that buffers bytes and yields complete lines
let byte_stream = resp.bytes_stream(); // This is a futures::Stream<Item = Result<Bytes, reqwest::Error>>
let record_stream = Box::pin(stream::unfold(
stream_reader,
move |mut reader| async move {
loop {
let mut line = String::new();
return match reader.read_line(&mut line).await {
Ok(n) => {
if n == 0 {
return None;
}
let s = line.trim();
if s.is_empty() {
continue;
}
let records_res: Result<NotificationRecords, Error> =
serde_json::from_str(s).map_err(|e| e.into());
Some((records_res, reader))
}
Err(e) => Some((Err(e.into()), reader)),
};
let line_stream = Box::pin(async_stream::try_stream! {
let mut buf = Vec::new();
let mut cursor = 0;
let mut stream = byte_stream.map_err(Error::from).boxed();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
buf.extend_from_slice(&chunk);
while let Some(pos) = buf[cursor..].iter().position(|&b| b == b'\n') {
let end = cursor + pos;
let line_bytes = &buf[..end];
let line = std::str::from_utf8(line_bytes)?.trim();
if !line.is_empty() {
let parsed: NotificationRecords = serde_json::from_str(line)?;
yield parsed;
}
cursor = end + 1;
}
},
));
// Shift buffer left if needed
if cursor > 0 {
buf.drain(..cursor);
cursor = 0;
}
}
// Drain the remaining buffer if not empty
if !buf.is_empty() {
let line = std::str::from_utf8(&buf)?.trim();
if !line.is_empty() {
let parsed: NotificationRecords = serde_json::from_str(line)?;
yield parsed;
}
}
});
Ok((
ListenBucketNotificationResponse {
@ -86,7 +98,7 @@ impl FromS3Response
region: req.inner_region,
bucket: take_bucket(req.bucket)?,
},
Box::new(record_stream),
Box::new(line_stream),
))
}
}

View File

@ -87,16 +87,16 @@ impl S3Request {
self
}
fn compute_inner_region(&self) -> Result<String, Error> {
async fn compute_inner_region(&self) -> Result<String, Error> {
Ok(match &self.bucket {
Some(b) => self.client.get_region_cached(b, &self.region)?,
Some(b) => self.client.get_region_cached(b, &self.region).await?,
None => DEFAULT_REGION.to_string(),
})
}
/// Execute the request, returning the response. Only used in [`S3Api::send()`]
pub async fn execute(&mut self) -> Result<reqwest::Response, Error> {
self.inner_region = self.compute_inner_region()?;
self.inner_region = self.compute_inner_region().await?;
self.client
.execute(
self.method.clone(),
@ -222,7 +222,7 @@ pub trait S3Api: ToS3Request {
/// or an error if the request failed at any stage.
///
async fn send(self) -> Result<Self::S3Response, Error> {
let mut req = self.to_s3request()?;
let mut req: S3Request = self.to_s3request()?;
let resp: Result<reqwest::Response, Error> = req.execute().await;
Self::S3Response::from_s3response(req, resp).await
}

View File

@ -69,10 +69,10 @@ async fn create_object_helper(
}
/// Append to the end of an existing object (happy flow)
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn append_object_0() {
let ctx = TestContext::new_from_env();
if !ctx.client.is_minio_express() {
if !ctx.client.is_minio_express().await {
println!("Skipping test because it is NOT running in MinIO Express mode");
return;
}
@ -124,10 +124,10 @@ async fn append_object_0() {
}
/// Append to the beginning of an existing object (happy flow)
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn append_object_1() {
let ctx = TestContext::new_from_env();
if !ctx.client.is_minio_express() {
if !ctx.client.is_minio_express().await {
println!("Skipping test because it is NOT running in MinIO Express mode");
return;
}
@ -178,10 +178,10 @@ async fn append_object_1() {
}
/// Append to the middle of an existing object (error InvalidWriteOffset)
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn append_object_2() {
let ctx = TestContext::new_from_env();
if !ctx.client.is_minio_express() {
if !ctx.client.is_minio_express().await {
println!("Skipping test because it is NOT running in MinIO Express mode");
return;
}
@ -214,10 +214,10 @@ async fn append_object_2() {
}
/// Append beyond the size of an existing object (error InvalidWriteOffset)
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn append_object_3() {
let ctx = TestContext::new_from_env();
if !ctx.client.is_minio_express() {
if !ctx.client.is_minio_express().await {
println!("Skipping test because it is NOT running in MinIO Express mode");
return;
}
@ -250,10 +250,10 @@ async fn append_object_3() {
}
/// Append to the beginning/end of a non-existing object (happy flow)
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn append_object_4() {
let ctx = TestContext::new_from_env();
if !ctx.client.is_minio_express() {
if !ctx.client.is_minio_express().await {
println!("Skipping test because it is NOT running in MinIO Express mode");
return;
}
@ -301,10 +301,10 @@ async fn append_object_4() {
}
/// Append beyond the size of a non-existing object (error NoSuchKey)
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn append_object_5() {
let ctx = TestContext::new_from_env();
if !ctx.client.is_minio_express() {
if !ctx.client.is_minio_express().await {
println!("Skipping test because it is NOT running in MinIO Express mode");
return;
}
@ -331,10 +331,10 @@ async fn append_object_5() {
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn append_object_content_0() {
let ctx = TestContext::new_from_env();
if !ctx.client.is_minio_express() {
if !ctx.client.is_minio_express().await {
println!("Skipping test because it is NOT running in MinIO Express mode");
return;
}
@ -381,10 +381,10 @@ async fn append_object_content_0() {
assert_eq!(resp.object_size, size * 2);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn append_object_content_1() {
let ctx = TestContext::new_from_env();
if !ctx.client.is_minio_express() {
if !ctx.client.is_minio_express().await {
println!("Skipping test because it is NOT running in MinIO Express mode");
return;
}
@ -433,10 +433,10 @@ async fn append_object_content_1() {
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn append_object_content_2() {
let ctx = TestContext::new_from_env();
if !ctx.client.is_minio_express() {
if !ctx.client.is_minio_express().await {
println!("Skipping test because it is NOT running in MinIO Express mode");
return;
}
@ -484,10 +484,10 @@ async fn append_object_content_2() {
}
/// Test sending AppendObject across async tasks.
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn append_object_content_3() {
let ctx = TestContext::new_from_env();
if !ctx.client.is_minio_express() {
if !ctx.client.is_minio_express().await {
println!("Skipping test because it is NOT running in MinIO Express mode");
return;
}

View File

@ -20,7 +20,7 @@ use minio::s3::types::S3Api;
use minio_common::test_context::TestContext;
use minio_common::utils::rand_bucket_name;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn bucket_create() {
let ctx = TestContext::new_from_env();
let bucket_name = rand_bucket_name();
@ -48,7 +48,7 @@ async fn bucket_create() {
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn bucket_delete() {
let ctx = TestContext::new_from_env();
let bucket_name = rand_bucket_name();

View File

@ -20,7 +20,7 @@ use minio::s3::response::{
use minio::s3::types::{S3Api, SseConfig};
use minio_common::test_context::TestContext;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn bucket_encryption() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;

View File

@ -18,7 +18,7 @@ use minio::s3::response::{BucketExistsResponse, DeleteBucketResponse};
use minio::s3::types::S3Api;
use minio_common::test_context::TestContext;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn bucket_exists() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;

View File

@ -23,7 +23,7 @@ use minio::s3::types::S3Api;
use minio_common::example::create_bucket_lifecycle_config_examples;
use minio_common::test_context::TestContext;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn bucket_lifecycle() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;

View File

@ -23,9 +23,14 @@ use minio_common::test_context::TestContext;
const SQS_ARN: &str = "arn:minio:sqs::miniojavatest:webhook";
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn test_bucket_notification() {
let ctx = TestContext::new_from_env();
if ctx.client.is_minio_express().await {
println!("Skipping test because it is running in MinIO Express mode");
return;
}
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;
let config: NotificationConfig = create_bucket_notification_config_example();

View File

@ -21,7 +21,7 @@ use minio::s3::types::S3Api;
use minio_common::example::create_bucket_policy_config_example;
use minio_common::test_context::TestContext;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn bucket_policy() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;

View File

@ -26,10 +26,10 @@ use minio_common::example::{
};
use minio_common::test_context::TestContext;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn bucket_replication_s3() {
let ctx = TestContext::new_from_env();
if ctx.client.is_minio_express() {
if ctx.client.is_minio_express().await {
println!("Skipping test because it is running in MinIO Express mode");
return;
}
@ -134,11 +134,11 @@ async fn bucket_replication_s3() {
//println!("response of getting replication: resp={:?}", resp);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn bucket_replication_s3express() {
let ctx = TestContext::new_from_env();
if !ctx.client.is_minio_express() {
if !ctx.client.is_minio_express().await {
println!("Skipping test because it is NOT running in MinIO Express mode");
return;
}

View File

@ -22,10 +22,10 @@ use minio::s3::types::S3Api;
use minio_common::example::create_tags_example;
use minio_common::test_context::TestContext;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn bucket_tags_s3() {
let ctx = TestContext::new_from_env();
if ctx.client.is_minio_express() {
if ctx.client.is_minio_express().await {
println!("Skipping test because it is running in MinIO Express mode");
return;
}
@ -73,10 +73,10 @@ async fn bucket_tags_s3() {
assert_eq!(resp.region, DEFAULT_REGION);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn bucket_tags_s3express() {
let ctx = TestContext::new_from_env();
if !ctx.client.is_minio_express() {
if !ctx.client.is_minio_express().await {
println!("Skipping test because it is NOT running in MinIO Express mode");
return;
}

View File

@ -20,10 +20,10 @@ use minio::s3::response::{GetBucketVersioningResponse, PutBucketVersioningRespon
use minio::s3::types::S3Api;
use minio_common::test_context::TestContext;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn bucket_versioning_s3() {
let ctx = TestContext::new_from_env();
if ctx.client.is_minio_express() {
if ctx.client.is_minio_express().await {
println!("Skipping test because it is running in MinIO Express mode");
return;
}
@ -70,10 +70,10 @@ async fn bucket_versioning_s3() {
assert_eq!(resp.region, DEFAULT_REGION);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn bucket_versioning_s3express() {
let ctx = TestContext::new_from_env();
if !ctx.client.is_minio_express() {
if !ctx.client.is_minio_express().await {
println!("Skipping test because it is NOT running in MinIO Express mode");
return;
}

View File

@ -19,7 +19,7 @@ use minio::s3::types::S3Api;
use minio_common::test_context::TestContext;
use minio_common::utils::rand_object_name;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn get_object() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;

View File

@ -19,7 +19,7 @@ use minio::s3::response::GetPresignedObjectUrlResponse;
use minio_common::test_context::TestContext;
use minio_common::utils::rand_object_name;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn get_presigned_object_url() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;

View File

@ -19,7 +19,7 @@ use minio_common::test_context::TestContext;
use minio_common::utils::rand_object_name;
use std::collections::HashMap;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn get_presigned_post_form_data() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;

View File

@ -18,7 +18,7 @@ use minio::s3::types::S3Api;
use minio_common::cleanup_guard::CleanupGuard;
use minio_common::test_context::TestContext;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn list_buckets() {
const N_BUCKETS: usize = 3;
let ctx = TestContext::new_from_env();

View File

@ -13,12 +13,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use async_std::stream::StreamExt;
use minio::s3::response::PutObjectContentResponse;
use minio::s3::types::ToStream;
use minio_common::test_context::TestContext;
use minio_common::utils::rand_object_name;
use std::collections::HashSet;
use tokio_stream::StreamExt;
async fn list_objects(
use_api_v1: bool,
@ -37,7 +37,7 @@ async fn list_objects(
}
let ctx = TestContext::new_from_env();
let is_express = ctx.client.is_minio_express();
let is_express = ctx.client.is_minio_express().await;
if is_express && !express {
println!("Skipping test because it is running in MinIO Express mode");
return;
@ -97,29 +97,29 @@ async fn list_objects(
assert_eq!(names_set_after, names_set_before);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn list_objects_v1_no_versions() {
list_objects(true, false, false, 5, 5).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn list_objects_v1_with_versions() {
list_objects(true, true, false, 5, 5).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn list_objects_v2_no_versions() {
list_objects(false, false, false, 5, 5).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn list_objects_v2_with_versions() {
list_objects(false, true, false, 5, 5).await;
}
/// Test for S3-Express: List objects with S3-Express are only supported with V2 API, without
/// versions, and yield unsorted results.
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn list_objects_express() {
list_objects(false, false, true, 5, 5).await;
}

View File

@ -13,6 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use async_std::stream::StreamExt;
use async_std::task;
use minio::s3::builders::ObjectContent;
use minio::s3::response::PutObjectContentResponse;
@ -21,9 +22,8 @@ use minio_common::rand_src::RandSrc;
use minio_common::test_context::TestContext;
use minio_common::utils::rand_object_name;
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn listen_bucket_notification() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;

View File

@ -20,7 +20,7 @@ use minio_common::rand_src::RandSrc;
use minio_common::test_context::TestContext;
use minio_common::utils::rand_object_name;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn compose_object() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;

View File

@ -20,10 +20,10 @@ use minio_common::rand_src::RandSrc;
use minio_common::test_context::TestContext;
use minio_common::utils::rand_object_name;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn copy_object() {
let ctx = TestContext::new_from_env();
if ctx.client.is_minio_express() {
if ctx.client.is_minio_express().await {
println!("Skipping test because it is running in MinIO Express mode");
return;
}

View File

@ -25,10 +25,10 @@ use minio_common::cleanup_guard::CleanupGuard;
use minio_common::test_context::TestContext;
use minio_common::utils::{rand_bucket_name, rand_object_name};
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn object_legal_hold_s3() {
let ctx = TestContext::new_from_env();
if ctx.client.is_minio_express() {
if ctx.client.is_minio_express().await {
println!("Skipping test because it is running in MinIO Express mode");
return;
}

View File

@ -22,10 +22,10 @@ use minio_common::cleanup_guard::CleanupGuard;
use minio_common::test_context::TestContext;
use minio_common::utils::rand_bucket_name;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn object_lock_config() {
let ctx = TestContext::new_from_env();
if ctx.client.is_minio_express() {
if ctx.client.is_minio_express().await {
println!("Skipping test because it is running in MinIO Express mode");
return;
}

View File

@ -22,7 +22,7 @@ use minio_common::test_context::TestContext;
use minio_common::utils::rand_object_name;
use tokio::sync::mpsc;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn put_object() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;
@ -53,7 +53,7 @@ async fn put_object() {
assert_eq!(resp.size, size);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn put_object_multipart() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;
@ -81,7 +81,7 @@ async fn put_object_multipart() {
assert_eq!(resp.size as u64, size);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn put_object_content() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;
@ -146,7 +146,7 @@ async fn put_object_content() {
}
/// Test sending ObjectContent across async tasks.
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn put_object_content_2() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;

View File

@ -13,14 +13,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use async_std::stream::StreamExt;
use minio::s3::builders::ObjectToDelete;
use minio::s3::response::PutObjectContentResponse;
use minio::s3::types::ToStream;
use minio_common::test_context::TestContext;
use minio_common::utils::rand_object_name;
use tokio_stream::StreamExt;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn remove_objects() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;

View File

@ -26,10 +26,10 @@ use minio_common::rand_src::RandSrc;
use minio_common::test_context::TestContext;
use minio_common::utils::{rand_bucket_name, rand_object_name};
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn object_retention() {
let ctx = TestContext::new_from_env();
if ctx.client.is_minio_express() {
if ctx.client.is_minio_express().await {
println!("Skipping test because it is running in MinIO Express mode");
return;
}
@ -66,7 +66,7 @@ async fn object_retention() {
//assert_eq!(resp.etag, "");
let retain_until_date = utc_now() + chrono::Duration::days(1);
let obj_resp: PutObjectRetentionResponse = ctx
let resp: PutObjectRetentionResponse = ctx
.client
.put_object_retention(&bucket_name, &object_name)
.retention_mode(Some(RetentionMode::GOVERNANCE))
@ -74,10 +74,10 @@ async fn object_retention() {
.send()
.await
.unwrap();
assert_eq!(obj_resp.bucket, bucket_name);
assert_eq!(obj_resp.object, object_name);
assert_eq!(obj_resp.version_id, None);
assert_eq!(obj_resp.region, DEFAULT_REGION);
assert_eq!(resp.bucket, bucket_name);
assert_eq!(resp.object, object_name);
assert_eq!(resp.version_id, None);
assert_eq!(resp.region, DEFAULT_REGION);
let resp: GetObjectRetentionResponse = ctx
.client

View File

@ -25,10 +25,10 @@ use minio_common::test_context::TestContext;
use minio_common::utils::rand_object_name;
use std::collections::HashMap;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn object_tags() {
let ctx = TestContext::new_from_env();
if ctx.client.is_minio_express() {
if ctx.client.is_minio_express().await {
println!("Skipping test because it is running in MinIO Express mode");
return;
}

View File

@ -24,7 +24,7 @@ use minio_common::test_context::TestContext;
use minio_common::utils::rand_object_name;
use tokio::sync::mpsc;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn put_object() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;
@ -78,7 +78,7 @@ async fn put_object() {
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn put_object_multipart() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;
@ -119,7 +119,7 @@ async fn put_object_multipart() {
assert_eq!(resp.version_id, None);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn put_object_content_1() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;
@ -162,7 +162,7 @@ async fn put_object_content_1() {
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn put_object_content_2() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;
@ -202,7 +202,7 @@ async fn put_object_content_2() {
}
/// Test sending PutObject across async tasks.
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn put_object_content_3() {
let ctx = TestContext::new_from_env();
let (bucket_name, _cleanup) = ctx.create_bucket_helper().await;

View File

@ -20,10 +20,10 @@ use minio_common::example::{create_select_content_data, create_select_content_re
use minio_common::test_context::TestContext;
use minio_common::utils::rand_object_name;
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn select_object_content_s3() {
let ctx = TestContext::new_from_env();
if ctx.client.is_minio_express() {
if ctx.client.is_minio_express().await {
println!("Skipping test because it is running in MinIO Express mode");
return;
}
@ -61,10 +61,10 @@ async fn select_object_content_s3() {
assert_eq!(got, select_data);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn select_object_content_express() {
let ctx = TestContext::new_from_env();
if !ctx.client.is_minio_express() {
if !ctx.client.is_minio_express().await {
println!("Skipping test because it is NOT running in MinIO Express mode");
return;
}

View File

@ -90,12 +90,12 @@ async fn upload_download_object(size: u64) {
fs::remove_file(&filename).unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn upload_download_object_1() {
upload_download_object(16).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
#[tokio::test(flavor = "multi_thread")]
async fn upload_download_object_2() {
upload_download_object(16 + 5 * 1024 * 1024).await;
}