Merge pull request #9 from grumbach/master

Add put_object_req method
This commit is contained in:
Daniel Valdivia 2021-02-05 11:19:15 -08:00 committed by GitHub
commit 8aa4d412ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 99 additions and 98 deletions

View File

@ -16,3 +16,65 @@
*/
pub mod minio;
#[cfg(test)]
mod tests {
use futures::{future::Future, stream::Stream};
use hyper::rt;
use log::debug;
use minio::BucketInfo;
use super::*;
fn get_local_default_server() -> minio::Client {
match minio::Client::new("http://localhost:9000") {
Ok(mut c) => {
c.set_credentials(minio::Credentials::new("minio", "minio123"));
c
}
Err(_) => panic!("could not make local client"),
}
}
#[test]
fn test_lib_functions() {
rt::run(rt::lazy(|| {
let c = minio::Client::get_play_client();
let bucket = "aaaa";
c.put_object_req(bucket, "hhhhhhhhhh", vec![], "object content".as_bytes().to_vec())
.and_then(|g| {
debug!("object: {} {} {:?}", g.object_size, g.etag, g.content_type);
g.get_object_stream().concat2()
})
.map(|c| debug!("get obj res: {:?}", c))
.map_err(|c| debug!("err res: {:?}", c))
.map(|_| {})
}));
rt::run(rt::lazy(|| {
let c = minio::Client::get_play_client();
let bucket = "aaaa";
c.get_object_req(bucket, "hhhhhhhhhh", vec![])
.and_then(|g| {
debug!("object: {} {} {:?}", g.object_size, g.etag, g.content_type);
g.get_object_stream().concat2()
})
.map(|c| debug!("get obj res: {:?}", c))
.map_err(|c| debug!("err res: {:?}", c))
.map(|_| {})
}));
rt::run(rt::lazy(|| {
let c = minio::Client::get_play_client();
let bucket = "aaaa";
c.delete_bucket(bucket)
.map(|_| debug!("Deleted!"))
.map_err(|err| debug!("del err: {:?}", err))
.map(|_| {})
}));
}
}

View File

@ -1,93 +0,0 @@
/*
* MinIO Rust Library for Amazon S3 Compatible Cloud Storage
* Copyright 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use futures::{future::Future, stream::Stream};
use hyper::rt;
use minio::BucketInfo;
mod minio;
fn get_local_default_server() -> minio::Client {
match minio::Client::new("http://localhost:9000") {
Ok(mut c) => {
c.set_credentials(minio::Credentials::new("minio", "minio123"));
c
}
Err(_) => panic!("could not make local client"),
}
}
fn main() {
rt::run(rt::lazy(|| {
// let c = get_local_default_server();
let c = minio::Client::get_play_client();
let bucket = "yyy";
let region_req = c
.get_bucket_location(bucket)
.map(|res| println!("{}", res.to_string()))
.map_err(|err| println!("{:?}", err));
let del_req = c
.delete_bucket(bucket)
.map(|_| println!("Deleted!"))
.map_err(|err| println!("del err: {:?}", err));
let buc_exists_req = c
.bucket_exists(bucket)
.map(move |e| println!("Bucket {} exists: {}", bucket, e))
.map_err(|err| println!("exists err: {:?}", err));
let make_bucket_req = c
.make_bucket(bucket)
.map(move |_| println!("Bucket {} created", bucket))
.map_err(move |err| println!("Bucket create for {} failed with {:?}", bucket, err));
let download_req = c
.get_object_req(bucket, "issue", vec![])
.and_then(|g| {
println!("issue: {} {} {:?}", g.object_size, g.etag, g.content_type);
g.get_object_stream().concat2()
})
.map(|c| println!("get obj res: {:?}", c))
.map_err(|c| println!("err res: {:?}", c));
let list_buckets_req = c
.list_buckets()
.map(|buckets| {
println!(
"{:?}",
buckets
.iter()
.map(|bucket: &BucketInfo| bucket.name.clone())
.collect::<Vec<String>>()
)
})
.map_err(|err| println!("{:?}", err));
let list_objects_req = c
.list_objects(bucket, None, None, None, None)
.map(|l_obj_resp| println!("{:?} {:?}", l_obj_resp, l_obj_resp.object_infos.len()))
.map_err(|err| println!("{:?}", err));
del_req
.join5(make_bucket_req, region_req, buc_exists_req, download_req)
.map(|_| ())
.and_then(|_| list_buckets_req)
.then(|_| list_objects_req)
}));
}

View File

@ -180,9 +180,12 @@ impl Client {
s3_req.body = body;
let sign_hdrs = sign::sign_v4(&s3_req, creds, region);
debug!("signout: {:?}", sign_hdrs);
api::mk_request(&s3_req, &server_addr, &sign_hdrs)
api::mk_request(s3_req, &server_addr, &sign_hdrs)
})
.and_then(move |req| {
debug!("{:?}", req);
conn_client.make_req(req).map_err(|e| Err::HyperErr(e))
})
.and_then(move |req| conn_client.make_req(req).map_err(|e| Err::HyperErr(e)))
.and_then(|resp| {
let st = resp.status();
if st.is_success() {
@ -303,6 +306,35 @@ impl Client {
.and_then(GetObjectResp::new)
}
pub fn put_object_req(
&self,
bucket_name: &str,
key: &str,
get_obj_opts: Vec<(HeaderName, HeaderValue)>,
data: Vec<u8>,
) -> impl Future<Item = GetObjectResp, Error = Err> {
let mut h = HeaderMap::new();
get_obj_opts
.iter()
.map(|(x, y)| (x.clone(), y.clone()))
.for_each(|(k, v)| {
h.insert(k, v);
});
let s3_req = S3Req {
method: Method::PUT,
bucket: Some(bucket_name.to_string()),
object: Some(key.to_string()),
headers: h,
query: Values::new(),
body: Body::from(data.clone()),
ts: time::now_utc(),
};
self.signed_req_future(s3_req, Ok(Body::from(data)))
.and_then(GetObjectResp::new)
}
pub fn make_bucket(&self, bucket_name: &str) -> impl Future<Item = (), Error = Err> {
let xml_body_res = xml::get_mk_bucket_body();
let bucket = bucket_name.clone().to_string();

View File

@ -20,14 +20,14 @@ use hyper::{header::HeaderName, header::HeaderValue, Body, Request};
use log::debug;
pub fn mk_request(
r: &minio::S3Req,
r: minio::S3Req,
svr_str: &str,
sign_hdrs: &Vec<(HeaderName, HeaderValue)>,
) -> Result<Request<Body>, minio::Err> {
let mut request = Request::builder();
let uri_str = svr_str.trim_end_matches('/');
debug!("uri_str: {}", uri_str);
let upd_uri = format!("{}{}?{}", uri_str, r.mk_path(), r.mk_query());
let upd_uri = format!("{}{}?{}", uri_str, &r.mk_path(), &r.mk_query());
debug!("upd_uri: {}", upd_uri);
request.uri(&upd_uri).method(&r.method);
@ -40,6 +40,6 @@ pub fn mk_request(
request.header(hdr.0, hdr.1);
}
request
.body(Body::empty())
.body(r.body)
.map_err(|err| minio::Err::HttpErr(err))
}