diff --git a/Cargo.lock b/Cargo.lock index d8714827e..4db6ac2f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2284,6 +2284,7 @@ dependencies = [ "cargo_toml", "clap 4.0.9", "crossbeam-channel", + "dump", "either", "env_logger", "file-store", diff --git a/dump/src/lib.rs b/dump/src/lib.rs index 2bed7d12a..da777b7d6 100644 --- a/dump/src/lib.rs +++ b/dump/src/lib.rs @@ -1,8 +1,10 @@ use meilisearch_types::{ error::ResponseError, + keys::Key, milli::update::IndexDocumentsMethod, settings::Unchecked, tasks::{Details, KindWithContent, Status, Task, TaskId}, + InstanceUid, }; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; @@ -12,7 +14,7 @@ mod reader; mod writer; pub use error::Error; -pub use reader::DumpReader; +pub use reader::{DumpReader, UpdateFile}; pub use writer::DumpWriter; const CURRENT_DUMP_VERSION: Version = Version::V6; @@ -49,14 +51,13 @@ pub enum Version { V6, } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct TaskDump { pub uid: TaskId, #[serde(default)] pub index_uid: Option, pub status: Status, - // TODO use our own Kind for the user #[serde(rename = "type")] pub kind: KindDump, @@ -82,7 +83,7 @@ pub struct TaskDump { } // A `Kind` specific version made for the dump. If modified you may break the dump. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum KindDump { DocumentImport { @@ -118,7 +119,9 @@ pub enum KindDump { query: String, tasks: Vec, }, - DumpExport, + DumpExport { + dump_uid: String, + }, Snapshot, } @@ -177,7 +180,7 @@ impl From for KindDump { KindWithContent::IndexSwap { lhs, rhs } => KindDump::IndexSwap { lhs, rhs }, KindWithContent::CancelTask { tasks } => KindDump::CancelTask { tasks }, KindWithContent::DeleteTasks { query, tasks } => KindDump::DeleteTasks { query, tasks }, - KindWithContent::DumpExport { .. } => KindDump::DumpExport, + KindWithContent::DumpExport { dump_uid, .. } => KindDump::DumpExport { dump_uid }, KindWithContent::Snapshot => KindDump::Snapshot, } } @@ -206,8 +209,7 @@ pub(crate) mod test { use uuid::Uuid; use crate::{ - reader::{self, Document}, - DumpReader, DumpWriter, IndexMetadata, KindDump, TaskDump, Version, + reader::Document, DumpReader, DumpWriter, IndexMetadata, KindDump, TaskDump, Version, }; pub fn create_test_instance_uid() -> Uuid { diff --git a/dump/src/reader/compat/v5_to_v6.rs b/dump/src/reader/compat/v5_to_v6.rs index ced41fc65..4c6390223 100644 --- a/dump/src/reader/compat/v5_to_v6.rs +++ b/dump/src/reader/compat/v5_to_v6.rs @@ -116,7 +116,9 @@ impl CompatV5ToV6 { allow_index_creation, settings: settings.into(), }, - v5::tasks::TaskContent::Dump { .. } => v6::Kind::DumpExport, + v5::tasks::TaskContent::Dump { uid } => { + v6::Kind::DumpExport { dump_uid: uid } + } }, details: task_view.details.map(|details| match details { v5::Details::DocumentAddition { @@ -412,7 +414,7 @@ pub(crate) mod test { // tasks let tasks = dump.tasks().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); - meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"0fff3c32487e3d3058d51ed951c1057f"); + meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"84d5b8eb31735d643483fcee28080edf"); assert_eq!(update_files.len(), 22); assert!(update_files[0].is_none()); // the dump creation assert!(update_files[1].is_some()); // the enqueued document addition diff --git a/dump/src/reader/mod.rs b/dump/src/reader/mod.rs index daa7df1f9..e549010a6 100644 --- a/dump/src/reader/mod.rs +++ b/dump/src/reader/mod.rs @@ -203,7 +203,7 @@ pub(crate) mod test { // tasks let tasks = dump.tasks().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); - meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"0fff3c32487e3d3058d51ed951c1057f"); + meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"84d5b8eb31735d643483fcee28080edf"); assert_eq!(update_files.len(), 22); assert!(update_files[0].is_none()); // the dump creation assert!(update_files[1].is_some()); // the enqueued document addition diff --git a/dump/src/reader/v6/mod.rs b/dump/src/reader/v6/mod.rs index 008a5ad27..b65034d80 100644 --- a/dump/src/reader/v6/mod.rs +++ b/dump/src/reader/v6/mod.rs @@ -109,7 +109,7 @@ impl V6Reader { &mut self, ) -> Box>)>> + '_> { Box::new((&mut self.tasks).lines().map(|line| -> Result<_> { - let task: Task = serde_json::from_str(&line?)?; + let task: Task = serde_json::from_str(&dbg!(line?)).unwrap(); let update_file_path = self .dump @@ -121,7 +121,8 @@ impl V6Reader { if update_file_path.exists() { Ok(( task, - Some(Box::new(UpdateFile::new(&update_file_path)?) as Box), + Some(Box::new(UpdateFile::new(&update_file_path).unwrap()) + as Box), )) } else { Ok((task, None)) diff --git a/dump/src/writer.rs b/dump/src/writer.rs index 4f0d20754..d588022a5 100644 --- a/dump/src/writer.rs +++ b/dump/src/writer.rs @@ -71,24 +71,26 @@ impl DumpWriter { } pub struct KeyWriter { - file: File, + keys: BufWriter, } impl KeyWriter { pub(crate) fn new(path: PathBuf) -> Result { - let file = File::create(path.join("keys.jsonl"))?; - Ok(KeyWriter { file }) + let keys = File::create(path.join("keys.jsonl"))?; + Ok(KeyWriter { + keys: BufWriter::new(keys), + }) } pub fn push_key(&mut self, key: &Key) -> Result<()> { - self.file.write_all(&serde_json::to_vec(key)?)?; - self.file.write_all(b"\n")?; + self.keys.write_all(&serde_json::to_vec(key)?)?; + self.keys.write_all(b"\n")?; Ok(()) } } pub struct TaskWriter { - queue: File, + queue: BufWriter, update_files: PathBuf, } @@ -101,7 +103,7 @@ impl TaskWriter { std::fs::create_dir(&update_files)?; Ok(TaskWriter { - queue, + queue: BufWriter::new(queue), update_files, }) } @@ -111,6 +113,7 @@ impl TaskWriter { pub fn push_task(&mut self, task: &TaskDump) -> Result { self.queue.write_all(&serde_json::to_vec(task)?)?; self.queue.write_all(b"\n")?; + self.queue.flush()?; Ok(UpdateFile::new( self.update_files.join(format!("{}.jsonl", task.uid)), diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index bb7a3613c..6075c6145 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -736,7 +736,7 @@ impl IndexScheduler { let user_result = match user_result { Ok(count) => Ok(DocumentAdditionResult { indexed_documents: count, - number_of_documents: count, + number_of_documents: count, // TODO: this is wrong, we should use the value stored in the Details. }), Err(e) => Err(milli::Error::from(e)), }; diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index 49ce1d021..bccc51543 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -13,6 +13,8 @@ pub enum Error { IndexAlreadyExists(String), #[error("Corrupted task queue.")] CorruptedTaskQueue, + #[error("Corrupted dump.")] + CorruptedDump, #[error("Task `{0}` not found")] TaskNotFound(TaskId), // TODO: Lo: proper error message for this @@ -49,14 +51,15 @@ impl ErrorCode for Error { Error::InvalidStatus(_) => Code::BadRequest, Error::InvalidKind(_) => Code::BadRequest, - // TODO: TAMO: are all these errors really internal? Error::Dump(e) => e.error_code(), Error::Milli(e) => e.error_code(), + // TODO: TAMO: are all these errors really internal? Error::Heed(_) => Code::Internal, Error::FileStore(_) => Code::Internal, Error::IoError(_) => Code::Internal, Error::Anyhow(_) => Code::Internal, Error::CorruptedTaskQueue => Code::Internal, + Error::CorruptedDump => Code::Internal, } } } diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index 608bf8e72..b096ece1f 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -28,7 +28,7 @@ pub struct IndexMapper { base_path: PathBuf, index_size: usize, - indexer_config: Arc, + pub indexer_config: Arc, } /// Weither the index must not be inserted back diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index e4fd34fd1..3d7f32520 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -9,13 +9,17 @@ mod utils; pub type Result = std::result::Result; pub type TaskId = u32; +use dump::{KindDump, TaskDump, UpdateFile}; pub use error::Error; +use meilisearch_types::keys::Key; +use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; +use meilisearch_types::InstanceUid; use std::path::PathBuf; use std::sync::{Arc, RwLock}; -use file_store::{File, FileStore}; +use file_store::FileStore; use meilisearch_types::error::ResponseError; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; @@ -220,10 +224,6 @@ impl IndexScheduler { Ok(this) } - pub fn import_dump(&self, dump_path: PathBuf) -> Result<()> { - todo!() - } - /// This function will execute in a different thread and must be called only once. fn run(&self) { let run = Self { @@ -254,6 +254,10 @@ impl IndexScheduler { }); } + pub fn indexer_config(&self) -> &IndexerConfig { + &self.index_mapper.indexer_config + } + /// Return the index corresponding to the name. If it wasn't opened before /// it'll be opened. But if it doesn't exist on disk it'll throw an /// `IndexNotFound` error. @@ -390,11 +394,138 @@ impl IndexScheduler { Ok(task) } - pub fn create_update_file(&self) -> Result<(Uuid, File)> { + /// Register a new task comming from a dump in the scheduler. + /// By takinig a mutable ref we're pretty sure no one will ever import a dump while actix is running. + pub fn register_dumpped_task( + &mut self, + task: TaskDump, + content_file: Option>, + keys: &[Key], + instance_uid: Option, + ) -> Result { + // Currently we don't need to access the tasks queue while loading a dump thus I can block everything. + let mut wtxn = self.env.write_txn()?; + + let content_uuid = if let Some(content_file) = content_file { + let (uuid, mut file) = self.create_update_file()?; + let mut builder = DocumentsBatchBuilder::new(file.as_file_mut()); + for doc in content_file { + builder.append_json_object(&doc?)?; + } + builder.into_inner()?; + + file.persist()?; + + Some(uuid) + } else { + None + }; + + let task = Task { + uid: task.uid, + enqueued_at: task.enqueued_at, + started_at: task.started_at, + finished_at: task.finished_at, + error: task.error, + details: task.details, + status: task.status, + kind: match task.kind { + KindDump::DocumentImport { + primary_key, + method, + documents_count, + allow_index_creation, + } => KindWithContent::DocumentImport { + index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, + primary_key, + method, + content_file: content_uuid.ok_or(Error::CorruptedDump)?, + documents_count, + allow_index_creation, + }, + KindDump::DocumentDeletion { documents_ids } => KindWithContent::DocumentDeletion { + documents_ids, + index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, + }, + KindDump::DocumentClear => KindWithContent::DocumentClear { + index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, + }, + KindDump::Settings { + settings, + is_deletion, + allow_index_creation, + } => KindWithContent::Settings { + index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, + new_settings: settings, + is_deletion, + allow_index_creation, + }, + KindDump::IndexDeletion => KindWithContent::IndexDeletion { + index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, + }, + KindDump::IndexCreation { primary_key } => KindWithContent::IndexCreation { + index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, + primary_key, + }, + KindDump::IndexUpdate { primary_key } => KindWithContent::IndexUpdate { + index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, + primary_key, + }, + KindDump::IndexSwap { lhs, rhs } => KindWithContent::IndexSwap { lhs, rhs }, + KindDump::CancelTask { tasks } => KindWithContent::CancelTask { tasks }, + KindDump::DeleteTasks { query, tasks } => { + KindWithContent::DeleteTasks { query, tasks } + } + KindDump::DumpExport { dump_uid } => KindWithContent::DumpExport { + dump_uid, + keys: keys.to_vec(), + instance_uid, + }, + KindDump::Snapshot => KindWithContent::Snapshot, + }, + }; + + self.all_tasks + .append(&mut wtxn, &BEU32::new(task.uid), &task)?; + + if let Some(indexes) = task.indexes() { + for index in indexes { + self.update_index(&mut wtxn, index, |bitmap| { + bitmap.insert(task.uid); + })?; + } + } + + self.update_status(&mut wtxn, task.status, |bitmap| { + bitmap.insert(task.uid); + })?; + + self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| { + (bitmap.insert(task.uid)); + })?; + + match wtxn.commit() { + Ok(()) => (), + _e @ Err(_) => { + todo!("remove the data associated with the task"); + // _e?; + } + } + + Ok(task) + } + + /// Create a new index without any associated task. + pub fn create_raw_index(&self, name: &str) -> Result { + let mut wtxn = self.env.write_txn()?; + self.index_mapper.create_index(&mut wtxn, name) + } + + pub fn create_update_file(&self) -> Result<(Uuid, file_store::File)> { Ok(self.file_store.new_update()?) } #[cfg(test)] - pub fn create_update_file_with_uuid(&self, uuid: u128) -> Result<(Uuid, File)> { + pub fn create_update_file_with_uuid(&self, uuid: u128) -> Result<(Uuid, file_store::File)> { Ok(self.file_store.new_update_with_uuid(uuid)?) } diff --git a/meilisearch-auth/src/lib.rs b/meilisearch-auth/src/lib.rs index 1cbdb13e0..2142fb9c7 100644 --- a/meilisearch-auth/src/lib.rs +++ b/meilisearch-auth/src/lib.rs @@ -165,6 +165,17 @@ impl AuthController { None => Ok(false), } } + + /// Delete all the keys in the DB. + pub fn raw_delete_all_keys(&mut self) -> Result<()> { + self.store.delete_all_keys() + } + + /// Delete all the keys in the DB. + pub fn raw_insert_key(&mut self, key: Key) -> Result<()> { + self.store.put_api_key(key)?; + Ok(()) + } } pub struct AuthFilter { diff --git a/meilisearch-auth/src/store.rs b/meilisearch-auth/src/store.rs index 83a22ca3f..efbac3ae0 100644 --- a/meilisearch-auth/src/store.rs +++ b/meilisearch-auth/src/store.rs @@ -197,6 +197,13 @@ impl HeedAuthStore { Ok(existing) } + pub fn delete_all_keys(&self) -> Result<()> { + let mut wtxn = self.env.write_txn()?; + self.keys.clear(&mut wtxn)?; + wtxn.commit()?; + Ok(()) + } + pub fn list_api_keys(&self) -> Result> { let mut list = Vec::new(); let rtxn = self.env.read_txn()?; diff --git a/meilisearch-http/Cargo.toml b/meilisearch-http/Cargo.toml index 547bfa5c9..64d6e6f2b 100644 --- a/meilisearch-http/Cargo.toml +++ b/meilisearch-http/Cargo.toml @@ -34,6 +34,7 @@ byte-unit = { version = "4.0.14", default-features = false, features = ["std", " bytes = "1.2.1" clap = { version = "4.0.9", features = ["derive", "env"] } crossbeam-channel = "0.5.6" +dump = { path = "../dump" } either = "1.8.0" env_logger = "0.9.1" flate2 = "1.0.24" diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index d08d457e0..6fdf07571 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -13,14 +13,28 @@ pub mod metrics; #[cfg(feature = "metrics")] pub mod route_metrics; -use std::sync::{atomic::AtomicBool, Arc}; +use std::{ + fs::File, + io::{BufReader, BufWriter, Seek, SeekFrom}, + path::Path, + sync::{atomic::AtomicBool, Arc}, +}; use crate::error::MeilisearchHttpError; use actix_web::error::JsonPayloadError; use actix_web::web::Data; use analytics::Analytics; +use anyhow::bail; use error::PayloadError; use http::header::CONTENT_TYPE; +use meilisearch_types::{ + milli::{ + self, + documents::{DocumentsBatchBuilder, DocumentsBatchReader}, + update::{IndexDocumentsConfig, IndexDocumentsMethod}, + }, + settings::apply_settings_to_builder, +}; pub use option::Opt; use actix_web::{web, HttpRequest}; @@ -31,19 +45,83 @@ use meilisearch_auth::AuthController; pub static AUTOBATCHING_ENABLED: AtomicBool = AtomicBool::new(false); +/// Check if a db is empty. It does not provide any information on the +/// validity of the data in it. +/// We consider a database as non empty when it's a non empty directory. +fn is_empty_db(db_path: impl AsRef) -> bool { + let db_path = db_path.as_ref(); + + if !db_path.exists() { + true + // if we encounter an error or if the db is a file we consider the db non empty + } else if let Ok(dir) = db_path.read_dir() { + dir.count() == 0 + } else { + true + } +} + // TODO: TAMO: Finish setting up things -pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result { - let meilisearch = IndexScheduler::new( - opt.db_path.join("tasks"), - opt.db_path.join("update_files"), - opt.db_path.join("indexes"), - opt.dumps_dir.clone(), - opt.max_index_size.get_bytes() as usize, - (&opt.indexer_options).try_into()?, - true, - #[cfg(test)] - todo!("We'll see later"), - )?; +pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(IndexScheduler, AuthController)> { + // we don't want to create anything in the data.ms yet, thus we + // wrap our two builders in a closure that'll be executed later. + let auth_controller_builder = || AuthController::new(&opt.db_path, &opt.master_key); + + let index_scheduler_builder = || { + IndexScheduler::new( + opt.db_path.join("tasks"), + opt.db_path.join("update_files"), + opt.db_path.join("indexes"), + opt.dumps_dir.clone(), + opt.max_index_size.get_bytes() as usize, + (&opt.indexer_options).try_into()?, + true, + #[cfg(test)] + todo!("We'll see later"), + ) + }; + + let (index_scheduler, auth_controller) = if let Some(ref _path) = opt.import_snapshot { + // handle the snapshot with something akin to the dumps + // + the snapshot interval / spawning a thread + todo!(); + } else if let Some(ref path) = opt.import_dump { + let empty_db = is_empty_db(&opt.db_path); + let src_path_exists = path.exists(); + + if empty_db && src_path_exists { + let mut index_scheduler = index_scheduler_builder()?; + let mut auth_controller = auth_controller_builder()?; + import_dump( + &opt.db_path, + path, + &mut index_scheduler, + &mut auth_controller, + )?; + (index_scheduler, auth_controller) + } else if !empty_db && !opt.ignore_dump_if_db_exists { + bail!( + "database already exists at {:?}, try to delete it or rename it", + opt.db_path + .canonicalize() + .unwrap_or_else(|_| opt.db_path.to_owned()) + ) + } else if !src_path_exists && !opt.ignore_missing_dump { + bail!("dump doesn't exist at {:?}", path) + } else { + let mut index_scheduler = index_scheduler_builder()?; + let mut auth_controller = auth_controller_builder()?; + import_dump( + &opt.db_path, + path, + &mut index_scheduler, + &mut auth_controller, + )?; + (index_scheduler, auth_controller) + } + } else { + (index_scheduler_builder()?, auth_controller_builder()?) + }; /* TODO: We should start a thread to handle the snapshots. @@ -53,25 +131,125 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result { .set_ignore_snapshot_if_db_exists(opt.ignore_snapshot_if_db_exists) .set_snapshot_interval(Duration::from_secs(opt.snapshot_interval_sec)) .set_snapshot_dir(opt.snapshot_dir.clone()) - // dump - .set_ignore_missing_dump(opt.ignore_missing_dump) - .set_ignore_dump_if_db_exists(opt.ignore_dump_if_db_exists) - .set_dump_dst(opt.dumps_dir.clone()); if let Some(ref path) = opt.import_snapshot { meilisearch.set_import_snapshot(path.clone()); } - if let Some(ref path) = opt.import_dump { - meilisearch.set_dump_src(path.clone()); - } - if opt.schedule_snapshot { meilisearch.set_schedule_snapshot(); } */ - Ok(meilisearch) + Ok((index_scheduler, auth_controller)) +} + +fn import_dump( + db_path: &Path, + dump_path: &Path, + index_scheduler: &mut IndexScheduler, + auth: &mut AuthController, +) -> Result<(), anyhow::Error> { + let reader = File::open(dump_path)?; + let mut dump_reader = dump::DumpReader::open(reader)?; + + if let Some(date) = dump_reader.date() { + log::info!( + "Importing a dump of meilisearch `{:?}` from the {}", + dump_reader.version(), // TODO: get the meilisearch version instead of the dump version + date + ); + } else { + log::info!( + "Importing a dump of meilisearch `{:?}`", + dump_reader.version(), // TODO: get the meilisearch version instead of the dump version + ); + } + + let instance_uid = dump_reader.instance_uid()?; + + // 1. Import the instance-uid. + if let Some(ref instance_uid) = instance_uid { + // we don't want to panic if there is an error with the instance-uid. + let _ = std::fs::write( + db_path.join("instance-uid"), + instance_uid.to_string().as_bytes(), + ); + }; + + // 2. Import the `Key`s. + let mut keys = Vec::new(); + auth.raw_delete_all_keys()?; + for key in dump_reader.keys() { + let key = key?; + auth.raw_insert_key(key.clone())?; + keys.push(key); + } + + // 3. Import the tasks. + for ret in dump_reader.tasks() { + let (task, file) = ret?; + index_scheduler.register_dumpped_task(task, file, &keys, instance_uid)?; + } + + let indexer_config = index_scheduler.indexer_config(); + + // 4. Import the indexes. + for index_reader in dump_reader.indexes()? { + let mut index_reader = index_reader?; + let metadata = index_reader.metadata(); + log::info!("Importing index `{}`.", metadata.uid); + let index = index_scheduler.create_raw_index(&metadata.uid)?; + + let mut wtxn = index.write_txn()?; + + let mut builder = milli::update::Settings::new(&mut wtxn, &index, indexer_config); + // 4.1 Import the primary key if there is one. + if let Some(ref primary_key) = metadata.primary_key { + builder.set_primary_key(primary_key.to_string()); + } + + // 4.2 Import the settings. + log::info!("Importing the settings."); + let settings = index_reader.settings()?; + apply_settings_to_builder(&settings, &mut builder); + builder.execute(|indexing_step| { + log::debug!("update: {:?}", indexing_step); + })?; + + // 4.3 Import the documents. + // 4.3.1 We need to recreate the grenad+obkv format accepted by the index. + log::info!("Importing the documents."); + let mut file = tempfile::tempfile()?; + let mut builder = DocumentsBatchBuilder::new(BufWriter::new(&mut file)); + for document in index_reader.documents()? { + builder.append_json_object(&document?)?; + } + builder.into_inner()?; // this actually flush the content of the batch builder. + + // 4.3.2 We feed it to the milli index. + file.seek(SeekFrom::Start(0))?; + let reader = BufReader::new(file); + let reader = DocumentsBatchReader::from_reader(reader)?; + + let builder = milli::update::IndexDocuments::new( + &mut wtxn, + &index, + indexer_config, + IndexDocumentsConfig { + update_method: IndexDocumentsMethod::ReplaceDocuments, + ..Default::default() + }, + |indexing_step| log::debug!("update: {:?}", indexing_step), + )?; + + let (builder, user_result) = builder.add_documents(reader)?; + log::info!("{} documents found.", user_result?); + builder.execute()?; + wtxn.commit()?; + log::info!("All documents successfully imported."); + } + Ok(()) } pub fn configure_data( diff --git a/meilisearch-http/src/main.rs b/meilisearch-http/src/main.rs index 3d628f742..c76542a50 100644 --- a/meilisearch-http/src/main.rs +++ b/meilisearch-http/src/main.rs @@ -48,9 +48,13 @@ async fn main() -> anyhow::Result<()> { _ => unreachable!(), } - let index_scheduler = setup_meilisearch(&opt)?; - - let auth_controller = AuthController::new(&opt.db_path, &opt.master_key)?; + let (index_scheduler, auth_controller) = match setup_meilisearch(&opt) { + Ok(ret) => ret, + Err(e) => { + std::fs::remove_dir_all(opt.db_path)?; + return Err(e); + } + }; #[cfg(all(not(debug_assertions), feature = "analytics"))] let analytics = if !opt.no_analytics { diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index a4a67ea7e..039511b61 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -242,7 +242,9 @@ async fn document_addition( let (uuid, mut update_file) = index_scheduler.create_update_file()?; + // TODO: this can be slow, maybe we should spawn a thread? But the payload isn't Send+Sync :weary: // push the entire stream into a `Vec`. + // If someone sends us a never ending stream we're going to block the thread. // TODO: Maybe we should write it to a file to reduce the RAM consumption // and then reread it to convert it to obkv? let mut buffer = Vec::new(); diff --git a/meilisearch-types/src/keys.rs b/meilisearch-types/src/keys.rs index c2773b548..50c776767 100644 --- a/meilisearch-types/src/keys.rs +++ b/meilisearch-types/src/keys.rs @@ -14,7 +14,7 @@ type Result = std::result::Result; pub type KeyId = Uuid; -#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] pub struct Key { #[serde(skip_serializing_if = "Option::is_none")] pub description: Option, diff --git a/meilisearch-types/src/star_or.rs b/meilisearch-types/src/star_or.rs index 02c9c3524..e42821234 100644 --- a/meilisearch-types/src/star_or.rs +++ b/meilisearch-types/src/star_or.rs @@ -7,7 +7,7 @@ use std::str::FromStr; /// A type that tries to match either a star (*) or /// any other thing that implements `FromStr`. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum StarOr { Star, Other(T),