Store content files into the S3

This commit is contained in:
Kerollmops 2023-09-11 18:17:22 +02:00
parent 719fdd701b
commit a53a0fdb77
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
5 changed files with 75 additions and 24 deletions

View File

@ -22,20 +22,6 @@ pub enum Error {
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
impl Deref for File {
type Target = NamedTempFile;
fn deref(&self) -> &Self::Target {
&self.file
}
}
impl DerefMut for File {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.file
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct FileStore { pub struct FileStore {
path: PathBuf, path: PathBuf,
@ -146,6 +132,20 @@ impl File {
} }
} }
impl Deref for File {
type Target = NamedTempFile;
fn deref(&self) -> &Self::Target {
&self.file
}
}
impl DerefMut for File {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.file
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::io::Write; use std::io::Write;

View File

@ -198,6 +198,35 @@ impl Batch {
| IndexDocumentDeletionByFilter { index_uid, .. } => Some(index_uid), | IndexDocumentDeletionByFilter { index_uid, .. } => Some(index_uid),
} }
} }
/// Return the content fields uuids associated with this batch.
pub fn content_uuids(&self) -> Vec<Uuid> {
match self {
Batch::TaskCancelation { .. }
| Batch::TaskDeletion(_)
| Batch::Dump(_)
| Batch::IndexCreation { .. }
| Batch::IndexDocumentDeletionByFilter { .. }
| Batch::IndexUpdate { .. }
| Batch::SnapshotCreation(_)
| Batch::IndexDeletion { .. }
| Batch::IndexSwap { .. } => vec![],
Batch::IndexOperation { op, .. } => match op {
IndexOperation::DocumentOperation { operations, .. } => operations
.iter()
.flat_map(|op| match op {
DocumentOperation::Add(uuid) => Some(*uuid),
DocumentOperation::Delete(_) => None,
})
.collect(),
IndexOperation::DocumentDeletion { .. }
| IndexOperation::Settings { .. }
| IndexOperation::DocumentClear { .. }
| IndexOperation::SettingsAndDocumentOperation { .. }
| IndexOperation::DocumentClearAndSetting { .. } => vec![],
},
}
}
} }
impl IndexOperation { impl IndexOperation {

View File

@ -35,7 +35,6 @@ pub type TaskId = u32;
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::fs::File; use std::fs::File;
use std::ops::{Bound, RangeBounds}; use std::ops::{Bound, RangeBounds};
use std::os::fd::AsRawFd;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed; use std::sync::atomic::Ordering::Relaxed;
@ -59,7 +58,7 @@ use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use s3::Bucket; use s3::Bucket;
use synchronoise::SignalEvent; use synchronoise::SignalEvent;
use tempfile::{NamedTempFile, TempDir}; use tempfile::TempDir;
use time::format_description::well_known::Rfc3339; use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime; use time::OffsetDateTime;
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound}; use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound};
@ -281,7 +280,7 @@ pub struct IndexSchedulerOptions {
/// zookeeper client /// zookeeper client
pub zookeeper: Option<Arc<ZooKeeper>>, pub zookeeper: Option<Arc<ZooKeeper>>,
/// S3 bucket /// S3 bucket
pub s3: Option<Bucket>, pub s3: Option<Arc<Bucket>>,
} }
/// Structure which holds meilisearch's indexes and schedules the tasks /// Structure which holds meilisearch's indexes and schedules the tasks
@ -290,6 +289,7 @@ pub struct IndexSchedulerOptions {
pub struct IndexScheduler { pub struct IndexScheduler {
inner: Arc<RwLock<Option<IndexSchedulerInner>>>, inner: Arc<RwLock<Option<IndexSchedulerInner>>>,
zookeeper: Option<Arc<ZooKeeper>>, zookeeper: Option<Arc<ZooKeeper>>,
pub s3: Option<Arc<Bucket>>,
wake_up: Arc<SignalEvent>, wake_up: Arc<SignalEvent>,
} }
@ -300,6 +300,7 @@ impl IndexScheduler {
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>, #[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
) -> Result<Self> { ) -> Result<Self> {
let s3 = options.s3.clone();
let inner = IndexSchedulerInner::new( let inner = IndexSchedulerInner::new(
options, options,
#[cfg(test)] #[cfg(test)]
@ -334,6 +335,7 @@ impl IndexScheduler {
let this = IndexScheduler { let this = IndexScheduler {
zookeeper: inner.zookeeper.clone(), zookeeper: inner.zookeeper.clone(),
s3,
wake_up: inner.wake_up.clone(), wake_up: inner.wake_up.clone(),
inner: Arc::new(RwLock::new(Some(inner))), inner: Arc::new(RwLock::new(Some(inner))),
}; };
@ -1242,6 +1244,16 @@ impl IndexSchedulerInner {
Some(batch) => batch, Some(batch) => batch,
None => return Ok(TickOutcome::WaitForSignal), None => return Ok(TickOutcome::WaitForSignal),
}; };
if let Some(s3) = &self.options.s3 {
for uuid in batch.content_uuids() {
// TODO use a real UUIDv4
let (_, file) = self.file_store.new_update_with_uuid(uuid.as_u128())?;
s3.get_object_to_writer(&format!("/update-files/{}", uuid), &mut &*file).unwrap();
file.persist()?;
}
}
let index_uid = batch.index_uid().map(ToOwned::to_owned); let index_uid = batch.index_uid().map(ToOwned::to_owned);
drop(rtxn); drop(rtxn);

View File

@ -245,13 +245,15 @@ fn open_or_create_database_unchecked(
instance_features, instance_features,
zookeeper: zookeeper.clone(), zookeeper: zookeeper.clone(),
s3: opt.s3_url.as_ref().map(|url| { s3: opt.s3_url.as_ref().map(|url| {
Bucket::new( Arc::new(
"test-rust-s3", Bucket::new(
Region::Custom { region: "eu-central-1".to_owned(), endpoint: url.clone() }, "test-rust-s3",
Credentials::default().unwrap(), Region::Custom { region: "eu-central-1".to_owned(), endpoint: url.clone() },
Credentials::default().unwrap(),
)
.unwrap()
.with_path_style(),
) )
.unwrap()
.with_path_style()
}), }),
})) }))
.map_err(anyhow::Error::from); .map_err(anyhow::Error::from);

View File

@ -1,4 +1,4 @@
use std::io::ErrorKind; use std::io::{BufReader, ErrorKind};
use actix_web::http::header::CONTENT_TYPE; use actix_web::http::header::CONTENT_TYPE;
use actix_web::web::Data; use actix_web::web::Data;
@ -400,6 +400,7 @@ async fn document_addition(
} }
let read_file = buffer.into_inner().into_std().await; let read_file = buffer.into_inner().into_std().await;
let s3 = index_scheduler.s3.clone();
let documents_count = tokio::task::spawn_blocking(move || { let documents_count = tokio::task::spawn_blocking(move || {
let documents_count = match format { let documents_count = match format {
PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?, PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?,
@ -408,8 +409,15 @@ async fn document_addition(
} }
PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?, PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?,
}; };
if let Some(s3) = s3 {
let mut reader = BufReader::new(&*update_file);
s3.put_object_stream(&mut reader, format!("/update-files/{}", uuid)).unwrap();
}
// we NEED to persist the file here because we moved the `udpate_file` in another task. // we NEED to persist the file here because we moved the `udpate_file` in another task.
update_file.persist()?; update_file.persist()?;
Ok(documents_count) Ok(documents_count)
}) })
.await; .await;