8000 Drastically speed-up S3 snapshot uploads by Kerollmops · Pull Request #5988 · meilisearch/meilisearch · GitHub
[go: up one dir, main page]

Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/index-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ time = { version = "0.3.41", features = [
tracing = "0.1.41"
ureq = "2.12.1"
uuid = { version = "1.17.0", features = ["serde", "v4"] }
backoff = "0.4.0"
backoff = { version = "0.4.0", features = ["tokio"] }
reqwest = { version = "0.12.23", features = ["rustls-tls", "http2"], default-features = false }
rusty-s3 = "0.8.1"
tokio = { version = "1.47.1", features = ["full"] }
futures = "0.3.31"

[dev-dependencies]
big_s = "1.0.2"
Expand Down
82 changes: 46 additions & 36 deletions crates/index-scheduler/src/scheduler/process_snapshot_creation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,13 +438,14 @@ async fn multipart_stream_to_s3(
db_name: String,
reader: std::io::PipeReader,
) -> Result<(), Error> {
use std::{collections::VecDeque, os::fd::OwnedFd, path::PathBuf};
use std::io;
use std::{os::fd::OwnedFd, path::PathBuf};

use bytes::{Bytes, BytesMut};
use reqwest::{Client, Response};
use bytes::BytesMut;
use futures::stream::{FuturesUnordered, StreamExt};
use reqwest::Client;
use rusty_s3::S3Action as _;
use rusty_s3::{actions::CreateMultipartUpload, Bucket, BucketError, Credentials, UrlStyle};
use tokio::task::JoinHandle;

let reader = OwnedFd::from(reader);
let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?;
Expand Down Expand Up @@ -482,9 +483,7 @@ async fn multipart_stream_to_s3(
// We use this bumpalo for etags strings.
let bump = bumpalo::Bump::new();
let mut etags = Vec::<&str>::new();
let mut in_flight = VecDeque::<(JoinHandle<reqwest::Result<Response>>, Bytes)>::with_capacity(
s3_max_in_flight_parts.get(),
);
let mut in_flight = FuturesUnordered::new();

// Part numbers start at 1 and cannot be larger than 10k
for part_number in 1u16.. {
Expand All @@ -498,8 +497,21 @@ async fn multipart_stream_to_s3(

// Wait for a buffer to be ready if there are in-flight parts that landed
let mut buffer = if in_flight.len() >= s3_max_in_flight_parts.get() {
let (handle, buffer) = in_flight.pop_front().expect("At least one in flight request");
let resp = join_and_map_error(handle).await?;
let (join_result, buffer): (
Result<reqwest::Result<reqwest::Response>, tokio::task::JoinError>,
bytes::Bytes,
) = in_flight.next().await.expect("At least one in flight request");
// safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked
let resp = join_result.unwrap().map_err(Error::S3HttpError)?;
let resp = match resp.error_for_status_ref() {
Ok(_) => resp,
Err(_) => {
return Err(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
})
}
};
extract_and_append_etag(&bump, &mut etags, resp.headers())?;

let mut buffer = match buffer.try_into_mut() {
Expand All @@ -516,8 +528,6 @@ async fn multipart_stream_to_s3(
// we can continue and send the buffer/part
while buffer.len() < (s3_multipart_part_size as usize / 2) {
// Wait for the pipe to be readable

use std::io;
reader.readable().await?;

match reader.try_read_buf(&mut buffer) {
Expand Down Expand Up @@ -556,11 +566,24 @@ async fn multipart_stream_to_s3(
}
})
});
in_flight.push_back((task, body));

// Wrap the task to return both the result and the buffer
let task_with_buffer = async move { (task.await, body) };
in_flight.push(task_with_buffer);
}

for (handle, _buffer) in in_flight {
let resp = join_and_map_error(handle).await?;
while let Some((join_result, _buffer)) = in_flight.next().await {
// safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked
let resp = join_result.unwrap().map_err(Error::S3HttpError)?;
let resp = match resp.error_for_status_ref() {
Ok(_) => resp,
Err(_) => {
return Err(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
})
}
};
extract_and_append_etag(&bump, &mut etags, resp.headers())?;
}

Expand All @@ -580,16 +603,19 @@ async fn multipart_stream_to_s3(
let body = body.clone();
async move {
match client.post(url).body(body).send().await {
Ok(resp) if resp.status().is_client_error() => {
resp.error_for_status().map_err(backoff::Error::Permanent)
}
Ok(resp) if resp.status().is_client_error() => match resp.error_for_status_ref() {
Ok(_) => Ok(resp),
Err(_) => Err(backoff::Error::Permanent(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
})),
},
Ok(resp) => Ok(resp),
Err(e) => Err(backoff::Error::transient(e)),
Err(e) => Err(backoff::Error::transient(Error::S3HttpError(e))),
}
}
})
.await
.map_err(Error::S3HttpError)?;
.await?;

let status = resp.status();
let body = resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?;
Expand All @@ -600,22 +626,6 @@ async fn multipart_stream_to_s3(
}
}

#[cfg(unix)]
async fn join_and_map_error(
join_handle: tokio::task::JoinHandle<Result<reqwest::Response, reqwest::Error>>,
) -> Result<reqwest::Response> {
// safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked
let request = join_handle.await.unwrap();
let resp = request.map_err(Error::S3HttpError)?;
match resp.error_for_status_ref() {
Ok(_) => Ok(resp),
Err(_) => Err(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
}),
}
}

#[cfg(unix)]
fn extract_and_append_etag<'b>(
bump: &'b bumpalo::Bump,
Expand Down
Loading
0