diff --git a/file-store/src/lib.rs b/file-store/src/lib.rs index 75db9bb5f..8f1b9324c 100644 --- a/file-store/src/lib.rs +++ b/file-store/src/lib.rs @@ -22,20 +22,6 @@ pub enum Error { pub type Result = std::result::Result; -impl Deref for File { - type Target = NamedTempFile; - - fn deref(&self) -> &Self::Target { - &self.file - } -} - -impl DerefMut for File { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.file - } -} - #[derive(Clone, Debug)] pub struct FileStore { path: PathBuf, @@ -146,6 +132,20 @@ impl File { } } +impl Deref for File { + type Target = NamedTempFile; + + fn deref(&self) -> &Self::Target { + &self.file + } +} + +impl DerefMut for File { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.file + } +} + #[cfg(test)] mod test { use std::io::Write; diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 8237c8540..6aa208559 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -198,6 +198,35 @@ impl Batch { | IndexDocumentDeletionByFilter { index_uid, .. } => Some(index_uid), } } + + /// Return the content fields uuids associated with this batch. + pub fn content_uuids(&self) -> Vec { + match self { + Batch::TaskCancelation { .. } + | Batch::TaskDeletion(_) + | Batch::Dump(_) + | Batch::IndexCreation { .. } + | Batch::IndexDocumentDeletionByFilter { .. } + | Batch::IndexUpdate { .. } + | Batch::SnapshotCreation(_) + | Batch::IndexDeletion { .. } + | Batch::IndexSwap { .. } => vec![], + Batch::IndexOperation { op, .. } => match op { + IndexOperation::DocumentOperation { operations, .. } => operations + .iter() + .flat_map(|op| match op { + DocumentOperation::Add(uuid) => Some(*uuid), + DocumentOperation::Delete(_) => None, + }) + .collect(), + IndexOperation::DocumentDeletion { .. } + | IndexOperation::Settings { .. } + | IndexOperation::DocumentClear { .. } + | IndexOperation::SettingsAndDocumentOperation { .. } + | IndexOperation::DocumentClearAndSetting { .. } => vec![], + }, + } + } } impl IndexOperation { diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 18dea48b0..582b87f8d 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -35,7 +35,6 @@ pub type TaskId = u32; use std::collections::{BTreeMap, HashMap}; use std::fs::File; use std::ops::{Bound, RangeBounds}; -use std::os::fd::AsRawFd; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; @@ -59,7 +58,7 @@ use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard}; use roaring::RoaringBitmap; use s3::Bucket; use synchronoise::SignalEvent; -use tempfile::{NamedTempFile, TempDir}; +use tempfile::TempDir; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound}; @@ -281,7 +280,7 @@ pub struct IndexSchedulerOptions { /// zookeeper client pub zookeeper: Option>, /// S3 bucket - pub s3: Option, + pub s3: Option>, } /// Structure which holds meilisearch's indexes and schedules the tasks @@ -290,6 +289,7 @@ pub struct IndexSchedulerOptions { pub struct IndexScheduler { inner: Arc>>, zookeeper: Option>, + pub s3: Option>, wake_up: Arc, } @@ -300,6 +300,7 @@ impl IndexScheduler { #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, #[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>, ) -> Result { + let s3 = options.s3.clone(); let inner = IndexSchedulerInner::new( options, #[cfg(test)] @@ -334,6 +335,7 @@ impl IndexScheduler { let this = IndexScheduler { zookeeper: inner.zookeeper.clone(), + s3, wake_up: inner.wake_up.clone(), inner: Arc::new(RwLock::new(Some(inner))), }; @@ -1242,6 +1244,16 @@ impl IndexSchedulerInner { Some(batch) => batch, None => return Ok(TickOutcome::WaitForSignal), }; + + if let Some(s3) = &self.options.s3 { + for uuid in batch.content_uuids() { + // TODO use a real UUIDv4 + let (_, file) = self.file_store.new_update_with_uuid(uuid.as_u128())?; + s3.get_object_to_writer(&format!("/update-files/{}", uuid), &mut &*file).unwrap(); + file.persist()?; + } + } + let index_uid = batch.index_uid().map(ToOwned::to_owned); drop(rtxn); diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index b7e1be43a..4a4cdcc08 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -245,13 +245,15 @@ fn open_or_create_database_unchecked( instance_features, zookeeper: zookeeper.clone(), s3: opt.s3_url.as_ref().map(|url| { - Bucket::new( - "test-rust-s3", - Region::Custom { region: "eu-central-1".to_owned(), endpoint: url.clone() }, - Credentials::default().unwrap(), + Arc::new( + Bucket::new( + "test-rust-s3", + Region::Custom { region: "eu-central-1".to_owned(), endpoint: url.clone() }, + Credentials::default().unwrap(), + ) + .unwrap() + .with_path_style(), ) - .unwrap() - .with_path_style() }), })) .map_err(anyhow::Error::from); diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 249aeca4e..e0783812b 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -1,4 +1,4 @@ -use std::io::ErrorKind; +use std::io::{BufReader, ErrorKind}; use actix_web::http::header::CONTENT_TYPE; use actix_web::web::Data; @@ -400,6 +400,7 @@ async fn document_addition( } let read_file = buffer.into_inner().into_std().await; + let s3 = index_scheduler.s3.clone(); let documents_count = tokio::task::spawn_blocking(move || { let documents_count = match format { PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?, @@ -408,8 +409,15 @@ async fn document_addition( } PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?, }; + + if let Some(s3) = s3 { + let mut reader = BufReader::new(&*update_file); + s3.put_object_stream(&mut reader, format!("/update-files/{}", uuid)).unwrap(); + } + // we NEED to persist the file here because we moved the `udpate_file` in another task. update_file.persist()?; + Ok(documents_count) }) .await;