diff --git a/meilisearch-http/build.rs b/meilisearch-http/build.rs index 5dbde1477..557e04fe7 100644 --- a/meilisearch-http/build.rs +++ b/meilisearch-http/build.rs @@ -50,7 +50,7 @@ mod mini_dashboard { sha1_file.read_to_string(&mut sha1)?; if sha1 == meta["sha1"].as_str().unwrap() { // Nothing to do. - return Ok(()) + return Ok(()); } } @@ -62,7 +62,11 @@ mod mini_dashboard { hasher.update(&dashboard_assets_bytes); let sha1 = hex::encode(hasher.finalize()); - assert_eq!(meta["sha1"].as_str().unwrap(), sha1, "Downloaded mini-dashboard shasum differs from the one specified in the Cargo.toml"); + assert_eq!( + meta["sha1"].as_str().unwrap(), + sha1, + "Downloaded mini-dashboard shasum differs from the one specified in the Cargo.toml" + ); create_dir_all(&dashboard_dir)?; let cursor = Cursor::new(&dashboard_assets_bytes); diff --git a/meilisearch-http/src/data/mod.rs b/meilisearch-http/src/data/mod.rs index 008065d74..9f8a688bc 100644 --- a/meilisearch-http/src/data/mod.rs +++ b/meilisearch-http/src/data/mod.rs @@ -4,7 +4,9 @@ use std::sync::Arc; use sha2::Digest; use crate::index::{Checked, Settings}; -use crate::index_controller::{IndexController, IndexStats, Stats, DumpInfo, IndexMetadata, IndexSettings}; +use crate::index_controller::{ + DumpInfo, IndexController, IndexMetadata, IndexSettings, IndexStats, Stats, +}; use crate::option::Opt; pub mod search; @@ -67,7 +69,11 @@ impl Data { api_keys.generate_missing_api_keys(); - let inner = DataInner { index_controller, api_keys, options }; + let inner = DataInner { + index_controller, + api_keys, + options, + }; let inner = Arc::new(inner); Ok(Data { inner }) diff --git a/meilisearch-http/src/error.rs b/meilisearch-http/src/error.rs index 6489716ca..07bd96fb9 100644 --- a/meilisearch-http/src/error.rs +++ b/meilisearch-http/src/error.rs @@ -299,7 +299,7 @@ impl From for Error { JsonPayloadError::Payload(err) => { Error::BadRequest(format!("Problem while decoding the request: {}", err)) } - e => Error::Internal(format!("Unexpected Json error: {}", e)) + e => Error::Internal(format!("Unexpected Json error: {}", e)), } } } @@ -310,7 +310,7 @@ impl From for Error { QueryPayloadError::Deserialize(err) => { Error::BadRequest(format!("Invalid query parameters: {}", err)) } - e => Error::Internal(format!("Unexpected query payload error: {}", e)) + e => Error::Internal(format!("Unexpected query payload error: {}", e)), } } } diff --git a/meilisearch-http/src/helpers/authentication.rs b/meilisearch-http/src/helpers/authentication.rs index a1a0c431e..54d5488f4 100644 --- a/meilisearch-http/src/helpers/authentication.rs +++ b/meilisearch-http/src/helpers/authentication.rs @@ -1,16 +1,16 @@ use std::pin::Pin; use std::task::{Context, Poll}; +use actix_web::body::Body; use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform}; use actix_web::web; -use actix_web::body::Body; -use futures::ready; -use futures::future::{ok, Future, Ready}; use actix_web::ResponseError as _; +use futures::future::{ok, Future, Ready}; +use futures::ready; use pin_project::pin_project; -use crate::Data; use crate::error::{Error, ResponseError}; +use crate::Data; #[derive(Clone, Copy)] pub enum Authentication { @@ -59,19 +59,15 @@ where let data = req.app_data::>().unwrap(); if data.api_keys().master.is_none() { - return AuthenticationFuture::Authenticated(self.service.call(req)) + return AuthenticationFuture::Authenticated(self.service.call(req)); } let auth_header = match req.headers().get("X-Meili-API-Key") { Some(auth) => match auth.to_str() { Ok(auth) => auth, - Err(_) => { - return AuthenticationFuture::NoHeader(Some(req)) - } + Err(_) => return AuthenticationFuture::NoHeader(Some(req)), }, - None => { - return AuthenticationFuture::NoHeader(Some(req)) - } + None => return AuthenticationFuture::NoHeader(Some(req)), }; let authenticated = match self.acl { @@ -111,15 +107,13 @@ where { type Output = Result, actix_web::Error>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) ->Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); match this { - AuthProj::Authenticated(fut) => { - match ready!(fut.poll(cx)) { - Ok(resp) => Poll::Ready(Ok(resp)), - Err(e) => Poll::Ready(Err(e)), - } - } + AuthProj::Authenticated(fut) => match ready!(fut.poll(cx)) { + Ok(resp) => Poll::Ready(Ok(resp)), + Err(e) => Poll::Ready(Err(e)), + }, AuthProj::NoHeader(req) => { match req.take() { Some(req) => { @@ -135,7 +129,8 @@ where AuthProj::Refused(req) => { match req.take() { Some(req) => { - let bad_token = req.headers() + let bad_token = req + .headers() .get("X-Meili-API-Key") .map(|h| h.to_str().map(String::from).unwrap_or_default()) .unwrap_or_default(); diff --git a/meilisearch-http/src/index/dump.rs b/meilisearch-http/src/index/dump.rs new file mode 100644 index 000000000..13e6cbc02 --- /dev/null +++ b/meilisearch-http/src/index/dump.rs @@ -0,0 +1,132 @@ +use std::fs::{create_dir_all, File}; +use std::io::{BufRead, BufReader, Write}; +use std::path::Path; +use std::sync::Arc; + +use anyhow::{bail, Context}; +use heed::RoTxn; +use indexmap::IndexMap; +use milli::update::{IndexDocumentsMethod, UpdateFormat::JsonStream}; +use serde::{Deserialize, Serialize}; + +use crate::option::IndexerOpts; + +use super::{update_handler::UpdateHandler, Index, Settings, Unchecked}; + +#[derive(Serialize, Deserialize)] +struct DumpMeta { + settings: Settings, + primary_key: Option, +} + +const META_FILE_NAME: &str = "meta.json"; +const DATA_FILE_NAME: &str = "documents.jsonl"; + +impl Index { + pub fn dump(&self, path: impl AsRef) -> anyhow::Result<()> { + // acquire write txn make sure any ongoing write is finished before we start. + let txn = self.env.write_txn()?; + + self.dump_documents(&txn, &path)?; + self.dump_meta(&txn, &path)?; + + Ok(()) + } + + fn dump_documents(&self, txn: &RoTxn, path: impl AsRef) -> anyhow::Result<()> { + let document_file_path = path.as_ref().join(DATA_FILE_NAME); + let mut document_file = File::create(&document_file_path)?; + + let documents = self.all_documents(txn)?; + let fields_ids_map = self.fields_ids_map(txn)?; + + // dump documents + let mut json_map = IndexMap::new(); + for document in documents { + let (_, reader) = document?; + + for (fid, bytes) in reader.iter() { + if let Some(name) = fields_ids_map.name(fid) { + json_map.insert(name, serde_json::from_slice::(bytes)?); + } + } + + serde_json::to_writer(&mut document_file, &json_map)?; + document_file.write_all(b"\n")?; + + json_map.clear(); + } + + Ok(()) + } + + fn dump_meta(&self, txn: &RoTxn, path: impl AsRef) -> anyhow::Result<()> { + let meta_file_path = path.as_ref().join(META_FILE_NAME); + let mut meta_file = File::create(&meta_file_path)?; + + let settings = self.settings_txn(txn)?.into_unchecked(); + let primary_key = self.primary_key(txn)?.map(String::from); + let meta = DumpMeta { + settings, + primary_key, + }; + + serde_json::to_writer(&mut meta_file, &meta)?; + + Ok(()) + } + + pub fn load_dump( + src: impl AsRef, + dst: impl AsRef, + size: usize, + indexing_options: &IndexerOpts, + ) -> anyhow::Result<()> { + let dir_name = src + .as_ref() + .file_name() + .with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?; + let dst_dir_path = dst.as_ref().join("indexes").join(dir_name); + create_dir_all(&dst_dir_path)?; + + let meta_path = src.as_ref().join(META_FILE_NAME); + let mut meta_file = File::open(meta_path)?; + let DumpMeta { + settings, + primary_key, + } = serde_json::from_reader(&mut meta_file)?; + let settings = settings.check(); + let index = Self::open(&dst_dir_path, size)?; + let mut txn = index.write_txn()?; + + let handler = UpdateHandler::new(&indexing_options)?; + + index.update_settings_txn(&mut txn, &settings, handler.update_builder(0))?; + + let document_file_path = src.as_ref().join(DATA_FILE_NAME); + let reader = File::open(&document_file_path)?; + let mut reader = BufReader::new(reader); + reader.fill_buf()?; + // If the document file is empty, we don't perform the document addition, to prevent + // a primary key error to be thrown. + if !reader.buffer().is_empty() { + index.update_documents_txn( + &mut txn, + JsonStream, + IndexDocumentsMethod::UpdateDocuments, + Some(reader), + handler.update_builder(0), + primary_key.as_deref(), + )?; + } + + txn.commit()?; + + match Arc::try_unwrap(index.0) { + Ok(inner) => inner.prepare_for_closing().wait(), + Err(_) => bail!("Could not close index properly."), + } + + Ok(()) + } +} diff --git a/meilisearch-http/src/index/mod.rs b/meilisearch-http/src/index/mod.rs index f26cc4283..790ac58f0 100644 --- a/meilisearch-http/src/index/mod.rs +++ b/meilisearch-http/src/index/mod.rs @@ -1,17 +1,23 @@ -use std::{collections::{BTreeSet, HashSet}, marker::PhantomData}; +use std::collections::{BTreeSet, HashSet}; +use std::fs::create_dir_all; +use std::marker::PhantomData; use std::ops::Deref; +use std::path::Path; use std::sync::Arc; use anyhow::{bail, Context}; +use heed::{EnvOpenOptions, RoTxn}; use milli::obkv_to_json; use serde_json::{Map, Value}; use crate::helpers::EnvSizer; pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; -pub use updates::{Facets, Settings, Checked, Unchecked}; use serde::{de::Deserializer, Deserialize}; +pub use updates::{Checked, Facets, Settings, Unchecked}; +mod dump; mod search; +pub mod update_handler; mod updates; pub type Document = Map; @@ -36,9 +42,20 @@ where } impl Index { + pub fn open(path: impl AsRef, size: usize) -> anyhow::Result { + create_dir_all(&path)?; + let mut options = EnvOpenOptions::new(); + options.map_size(size); + let index = milli::Index::new(options, &path)?; + Ok(Index(Arc::new(index))) + } + pub fn settings(&self) -> anyhow::Result> { let txn = self.read_txn()?; + self.settings_txn(&txn) + } + pub fn settings_txn(&self, txn: &RoTxn) -> anyhow::Result> { let displayed_attributes = self .displayed_fields(&txn)? .map(|fields| fields.into_iter().map(String::from).collect()); @@ -95,8 +112,6 @@ impl Index { let mut documents = Vec::new(); - println!("fields to display: {:?}", fields_to_display); - for entry in iter { let (_id, obkv) = entry?; let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv)?; diff --git a/meilisearch-http/src/index/search.rs b/meilisearch-http/src/index/search.rs index 0ff6c1bc3..bf559eb91 100644 --- a/meilisearch-http/src/index/search.rs +++ b/meilisearch-http/src/index/search.rs @@ -90,7 +90,8 @@ impl Index { let mut documents = Vec::new(); let fields_ids_map = self.fields_ids_map(&rtxn).unwrap(); - let displayed_ids = self.displayed_fields_ids(&rtxn)? + let displayed_ids = self + .displayed_fields_ids(&rtxn)? .map(|fields| fields.into_iter().collect::>()) .unwrap_or_else(|| fields_ids_map.iter().map(|(id, _)| id).collect()); @@ -156,10 +157,8 @@ impl Index { }; let stop_words = fst::Set::default(); - let highlighter = Highlighter::new( - &stop_words, - (String::from(""), String::from("")), - ); + let highlighter = + Highlighter::new(&stop_words, (String::from(""), String::from(""))); for (_id, obkv) in self.documents(&rtxn, documents_ids)? { let document = make_document(&all_attributes, &fields_ids_map, obkv)?; @@ -384,17 +383,16 @@ mod test { #[test] fn no_formatted() { let stop_words = fst::Set::default(); - let highlighter = Highlighter::new( - &stop_words, - (String::from(""), String::from("")), - ); + let highlighter = + Highlighter::new(&stop_words, (String::from(""), String::from(""))); let mut fields = FieldsIdsMap::new(); let id = fields.insert("test").unwrap(); let mut buf = Vec::new(); let mut obkv = obkv::KvWriter::new(&mut buf); - obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()).unwrap(); + obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()) + .unwrap(); obkv.finish().unwrap(); let obkv = obkv::KvReader::new(&buf); @@ -410,8 +408,9 @@ mod test { &highlighter, &matching_words, &all_formatted, - &to_highlight_ids - ).unwrap(); + &to_highlight_ids, + ) + .unwrap(); assert!(value.is_empty()); } @@ -419,17 +418,16 @@ mod test { #[test] fn formatted_no_highlight() { let stop_words = fst::Set::default(); - let highlighter = Highlighter::new( - &stop_words, - (String::from(""), String::from("")), - ); + let highlighter = + Highlighter::new(&stop_words, (String::from(""), String::from(""))); let mut fields = FieldsIdsMap::new(); let id = fields.insert("test").unwrap(); let mut buf = Vec::new(); let mut obkv = obkv::KvWriter::new(&mut buf); - obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()).unwrap(); + obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()) + .unwrap(); obkv.finish().unwrap(); let obkv = obkv::KvReader::new(&buf); @@ -445,8 +443,9 @@ mod test { &highlighter, &matching_words, &all_formatted, - &to_highlight_ids - ).unwrap(); + &to_highlight_ids, + ) + .unwrap(); assert_eq!(value["test"], "hello"); } @@ -454,17 +453,16 @@ mod test { #[test] fn formatted_with_highlight() { let stop_words = fst::Set::default(); - let highlighter = Highlighter::new( - &stop_words, - (String::from(""), String::from("")), - ); + let highlighter = + Highlighter::new(&stop_words, (String::from(""), String::from(""))); let mut fields = FieldsIdsMap::new(); let id = fields.insert("test").unwrap(); let mut buf = Vec::new(); let mut obkv = obkv::KvWriter::new(&mut buf); - obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()).unwrap(); + obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()) + .unwrap(); obkv.finish().unwrap(); let obkv = obkv::KvReader::new(&buf); @@ -480,8 +478,9 @@ mod test { &highlighter, &matching_words, &all_formatted, - &to_highlight_ids - ).unwrap(); + &to_highlight_ids, + ) + .unwrap(); assert_eq!(value["test"], "hello"); } diff --git a/meilisearch-http/src/index_controller/update_handler.rs b/meilisearch-http/src/index/update_handler.rs similarity index 97% rename from meilisearch-http/src/index_controller/update_handler.rs rename to meilisearch-http/src/index/update_handler.rs index d0086aadd..6a303b4ce 100644 --- a/meilisearch-http/src/index_controller/update_handler.rs +++ b/meilisearch-http/src/index/update_handler.rs @@ -38,7 +38,7 @@ impl UpdateHandler { }) } - fn update_builder(&self, update_id: u64) -> UpdateBuilder { + pub fn update_builder(&self, update_id: u64) -> UpdateBuilder { // We prepare the update by using the update builder. let mut update_builder = UpdateBuilder::new(update_id); if let Some(max_nb_chunks) = self.max_nb_chunks { diff --git a/meilisearch-http/src/index/updates.rs b/meilisearch-http/src/index/updates.rs index 0f4bf3589..046823fb7 100644 --- a/meilisearch-http/src/index/updates.rs +++ b/meilisearch-http/src/index/updates.rs @@ -87,6 +87,28 @@ impl Settings { _kind: PhantomData, } } + + pub fn into_unchecked(self) -> Settings { + let Self { + displayed_attributes, + searchable_attributes, + attributes_for_faceting, + ranking_rules, + stop_words, + distinct_attribute, + .. + } = self; + + Settings { + displayed_attributes, + searchable_attributes, + attributes_for_faceting, + ranking_rules, + stop_words, + distinct_attribute, + _kind: PhantomData, + } + } } impl Settings { diff --git a/meilisearch-http/src/index_controller/dump_actor/actor.rs b/meilisearch-http/src/index_controller/dump_actor/actor.rs index 8e1e48ebe..c78079de6 100644 --- a/meilisearch-http/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-http/src/index_controller/dump_actor/actor.rs @@ -1,27 +1,29 @@ -use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus}; -use crate::helpers::compression; -use crate::index_controller::{index_actor, update_actor, uuid_resolver, IndexMetadata}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + use async_stream::stream; use chrono::Utc; -use futures::stream::StreamExt; +use futures::{lock::Mutex, stream::StreamExt}; use log::{error, info}; -use std::{ - collections::HashSet, - path::{Path, PathBuf}, - sync::Arc, -}; use tokio::sync::{mpsc, oneshot, RwLock}; -use uuid::Uuid; +use update_actor::UpdateActorHandle; +use uuid_resolver::UuidResolverHandle; + +use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus, DumpTask}; +use crate::index_controller::{update_actor, uuid_resolver}; pub const CONCURRENT_DUMP_MSG: usize = 10; -pub struct DumpActor { +pub struct DumpActor { inbox: Option>, uuid_resolver: UuidResolver, - index: Index, update: Update, dump_path: PathBuf, - dump_info: Arc>>, + lock: Arc>, + dump_infos: Arc>>, + update_db_size: usize, + index_db_size: usize, } /// Generate uid from creation date @@ -29,26 +31,30 @@ fn generate_uid() -> String { Utc::now().format("%Y%m%d-%H%M%S%3f").to_string() } -impl DumpActor +impl DumpActor where - UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static, - Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static, - Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static, + UuidResolver: UuidResolverHandle + Send + Sync + Clone + 'static, + Update: UpdateActorHandle + Send + Sync + Clone + 'static, { pub fn new( inbox: mpsc::Receiver, uuid_resolver: UuidResolver, - index: Index, update: Update, dump_path: impl AsRef, + index_db_size: usize, + update_db_size: usize, ) -> Self { + let dump_infos = Arc::new(RwLock::new(HashMap::new())); + let lock = Arc::new(Mutex::new(())); Self { inbox: Some(inbox), uuid_resolver, - index, update, dump_path: dump_path.as_ref().into(), - dump_info: Arc::new(RwLock::new(None)), + dump_infos, + lock, + index_db_size, + update_db_size, } } @@ -90,149 +96,61 @@ where } async fn handle_create_dump(&self, ret: oneshot::Sender>) { - if self.is_running().await { - ret.send(Err(DumpError::DumpAlreadyRunning)) - .expect("Dump actor is dead"); - return; - } let uid = generate_uid(); let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress); - *self.dump_info.write().await = Some(info.clone()); + + let _lock = match self.lock.try_lock() { + Some(lock) => lock, + None => { + ret.send(Err(DumpError::DumpAlreadyRunning)) + .expect("Dump actor is dead"); + return; + } + }; + + self.dump_infos + .write() + .await + .insert(uid.clone(), info.clone()); ret.send(Ok(info)).expect("Dump actor is dead"); - let dump_info = self.dump_info.clone(); + let task = DumpTask { + path: self.dump_path.clone(), + uuid_resolver: self.uuid_resolver.clone(), + update_handle: self.update.clone(), + uid: uid.clone(), + update_db_size: self.update_db_size, + index_db_size: self.index_db_size, + }; - let task_result = tokio::task::spawn(perform_dump( - self.dump_path.clone(), - self.uuid_resolver.clone(), - self.index.clone(), - self.update.clone(), - uid.clone(), - )) - .await; + let task_result = tokio::task::spawn(task.run()).await; + + let mut dump_infos = self.dump_infos.write().await; + let dump_infos = dump_infos + .get_mut(&uid) + .expect("dump entry deleted while lock was acquired"); match task_result { Ok(Ok(())) => { - (*dump_info.write().await).as_mut().expect("Inconsistent dump service state").done(); + dump_infos.done(); info!("Dump succeed"); } Ok(Err(e)) => { - (*dump_info.write().await).as_mut().expect("Inconsistent dump service state").with_error(e.to_string()); + dump_infos.with_error(e.to_string()); error!("Dump failed: {}", e); } Err(_) => { + dump_infos.with_error("Unexpected error while performing dump.".to_string()); error!("Dump panicked. Dump status set to failed"); - *dump_info.write().await = Some(DumpInfo::new(uid, DumpStatus::Failed)); } }; } async fn handle_dump_info(&self, uid: String) -> DumpResult { - match &*self.dump_info.read().await { - None => self.dump_from_fs(uid).await, - Some(DumpInfo { uid: ref s, .. }) if &uid != s => self.dump_from_fs(uid).await, + match self.dump_infos.read().await.get(&uid) { Some(info) => Ok(info.clone()), + _ => Err(DumpError::DumpDoesNotExist(uid)), } } - - async fn dump_from_fs(&self, uid: String) -> DumpResult { - self.dump_path - .join(format!("{}.dump", &uid)) - .exists() - .then(|| DumpInfo::new(uid.clone(), DumpStatus::Done)) - .ok_or(DumpError::DumpDoesNotExist(uid)) - } - - async fn is_running(&self) -> bool { - matches!( - *self.dump_info.read().await, - Some(DumpInfo { - status: DumpStatus::InProgress, - .. - }) - ) - } -} - -async fn perform_dump( - dump_path: PathBuf, - uuid_resolver: UuidResolver, - index: Index, - update: Update, - uid: String, -) -> anyhow::Result<()> -where - UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static, - Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static, - Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static, -{ - info!("Performing dump."); - - let dump_dir = dump_path.clone(); - tokio::fs::create_dir_all(&dump_dir).await?; - let temp_dump_dir = - tokio::task::spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??; - let temp_dump_path = temp_dump_dir.path().to_owned(); - - let uuids = uuid_resolver.list().await?; - // maybe we could just keep the vec as-is - let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect(); - - if uuids.is_empty() { - return Ok(()); - } - - let indexes = list_indexes(&uuid_resolver, &index).await?; - - // we create one directory by index - for meta in indexes.iter() { - tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?; - } - - let metadata = super::Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string()); - metadata.to_path(&temp_dump_path).await?; - - update.dump(uuids, temp_dump_path.clone()).await?; - - let dump_dir = dump_path.clone(); - let dump_path = dump_path.join(format!("{}.dump", uid)); - let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result { - let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?; - let temp_dump_file_path = temp_dump_file.path().to_owned(); - compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?; - temp_dump_file.persist(&dump_path)?; - Ok(dump_path) - }) - .await??; - - info!("Created dump in {:?}.", dump_path); - - Ok(()) -} - -async fn list_indexes( - uuid_resolver: &UuidResolver, - index: &Index, -) -> anyhow::Result> -where - UuidResolver: uuid_resolver::UuidResolverHandle, - Index: index_actor::IndexActorHandle, -{ - let uuids = uuid_resolver.list().await?; - - let mut ret = Vec::new(); - - for (uid, uuid) in uuids { - let meta = index.get_index_meta(uuid).await?; - let meta = IndexMetadata { - uuid, - name: uid.clone(), - uid, - meta, - }; - ret.push(meta); - } - - Ok(ret) } diff --git a/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs b/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs index 601c97c01..ab91aeae6 100644 --- a/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/dump_actor/handle_impl.rs @@ -1,6 +1,8 @@ -use std::path::{Path}; +use std::path::Path; + use actix_web::web::Bytes; use tokio::sync::{mpsc, oneshot}; + use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg, DumpResult}; #[derive(Clone)] @@ -29,13 +31,22 @@ impl DumpActorHandleImpl { pub fn new( path: impl AsRef, uuid_resolver: crate::index_controller::uuid_resolver::UuidResolverHandleImpl, - index: crate::index_controller::index_actor::IndexActorHandleImpl, update: crate::index_controller::update_actor::UpdateActorHandleImpl, + index_db_size: usize, + update_db_size: usize, ) -> anyhow::Result { let (sender, receiver) = mpsc::channel(10); - let actor = DumpActor::new(receiver, uuid_resolver, index, update, path); + let actor = DumpActor::new( + receiver, + uuid_resolver, + update, + path, + index_db_size, + update_db_size, + ); tokio::task::spawn(actor.run()); + Ok(Self { sender }) } } diff --git a/meilisearch-http/src/index_controller/dump_actor/loaders/mod.rs b/meilisearch-http/src/index_controller/dump_actor/loaders/mod.rs new file mode 100644 index 000000000..ae6adc7cf --- /dev/null +++ b/meilisearch-http/src/index_controller/dump_actor/loaders/mod.rs @@ -0,0 +1,2 @@ +pub mod v1; +pub mod v2; diff --git a/meilisearch-http/src/index_controller/dump_actor/v1.rs b/meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs similarity index 59% rename from meilisearch-http/src/index_controller/dump_actor/v1.rs rename to meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs index 6f199193c..decd67f87 100644 --- a/meilisearch-http/src/index_controller/dump_actor/v1.rs +++ b/meilisearch-http/src/index_controller/dump_actor/loaders/v1.rs @@ -1,12 +1,62 @@ -use std::{collections::{BTreeMap, BTreeSet}, marker::PhantomData}; +use std::collections::{BTreeMap, BTreeSet}; +use std::fs::{create_dir_all, File}; +use std::io::BufRead; +use std::marker::PhantomData; +use std::path::Path; +use std::sync::Arc; -use log::warn; +use heed::EnvOpenOptions; +use log::{error, info, warn}; +use milli::update::{IndexDocumentsMethod, UpdateFormat}; use serde::{Deserialize, Serialize}; -use crate::{index::Unchecked, index_controller}; -use crate::index::deserialize_some; -use super::*; +use uuid::Uuid; -/// This is the settings used in the last version of meilisearch exporting dump in V1 +use crate::index_controller::{self, uuid_resolver::HeedUuidStore, IndexMetadata}; +use crate::{ + index::{deserialize_some, update_handler::UpdateHandler, Index, Unchecked}, + option::IndexerOpts, +}; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct MetadataV1 { + db_version: String, + indexes: Vec, +} + +impl MetadataV1 { + pub fn load_dump( + self, + src: impl AsRef, + dst: impl AsRef, + size: usize, + indexer_options: &IndexerOpts, + ) -> anyhow::Result<()> { + info!( + "Loading dump, dump database version: {}, dump version: V1", + self.db_version + ); + + let uuid_store = HeedUuidStore::new(&dst)?; + for index in self.indexes { + let uuid = Uuid::new_v4(); + uuid_store.insert(index.uid.clone(), uuid)?; + let src = src.as_ref().join(index.uid); + load_index( + &src, + &dst, + uuid, + index.meta.primary_key.as_deref(), + size, + indexer_options, + )?; + } + + Ok(()) + } +} + +// These are the settings used in legacy meilisearch (>>, } +fn load_index( + src: impl AsRef, + dst: impl AsRef, + uuid: Uuid, + primary_key: Option<&str>, + size: usize, + indexer_options: &IndexerOpts, +) -> anyhow::Result<()> { + let index_path = dst.as_ref().join(&format!("indexes/index-{}", uuid)); + + create_dir_all(&index_path)?; + let mut options = EnvOpenOptions::new(); + options.map_size(size); + let index = milli::Index::new(options, index_path)?; + let index = Index(Arc::new(index)); + + // extract `settings.json` file and import content + let settings = import_settings(&src)?; + let settings: index_controller::Settings = settings.into(); + + let mut txn = index.write_txn()?; + + let handler = UpdateHandler::new(&indexer_options)?; + + index.update_settings_txn(&mut txn, &settings.check(), handler.update_builder(0))?; + + let file = File::open(&src.as_ref().join("documents.jsonl"))?; + let mut reader = std::io::BufReader::new(file); + reader.fill_buf()?; + if !reader.buffer().is_empty() { + index.update_documents_txn( + &mut txn, + UpdateFormat::JsonStream, + IndexDocumentsMethod::ReplaceDocuments, + Some(reader), + handler.update_builder(0), + primary_key, + )?; + } + + txn.commit()?; + + // Finaly, we extract the original milli::Index and close it + Arc::try_unwrap(index.0) + .map_err(|_e| "Couldn't close the index properly") + .unwrap() + .prepare_for_closing() + .wait(); + + // Updates are ignored in dumps V1. + + Ok(()) +} + /// we need to **always** be able to convert the old settings to the settings currently being used impl From for index_controller::Settings { fn from(settings: Settings) -> Self { @@ -69,54 +173,11 @@ impl From for index_controller::Settings { } /// Extract Settings from `settings.json` file present at provided `dir_path` -fn import_settings(dir_path: &Path) -> anyhow::Result { - let path = dir_path.join("settings.json"); +fn import_settings(dir_path: impl AsRef) -> anyhow::Result { + let path = dir_path.as_ref().join("settings.json"); let file = File::open(path)?; let reader = std::io::BufReader::new(file); let metadata = serde_json::from_reader(reader)?; Ok(metadata) } - - -pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> { - let index_path = db_path.join(&format!("indexes/index-{}", uuid)); - info!("Importing a dump from an old version of meilisearch with dump version 1"); - - std::fs::create_dir_all(&index_path)?; - let mut options = EnvOpenOptions::new(); - options.map_size(size); - let index = milli::Index::new(options, index_path)?; - let index = Index(Arc::new(index)); - - // extract `settings.json` file and import content - let settings = import_settings(&dump_path)?; - let settings: index_controller::Settings = settings.into(); - let update_builder = UpdateBuilder::new(0); - index.update_settings(&settings.check(), update_builder)?; - - let update_builder = UpdateBuilder::new(1); - let file = File::open(&dump_path.join("documents.jsonl"))?; - let reader = std::io::BufReader::new(file); - - // TODO: TAMO: waiting for milli. We should use the result - let _ = index.update_documents( - UpdateFormat::JsonStream, - IndexDocumentsMethod::ReplaceDocuments, - Some(reader), - update_builder, - primary_key, - ); - - // the last step: we extract the original milli::Index and close it - Arc::try_unwrap(index.0) - .map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index") - .unwrap() - .prepare_for_closing() - .wait(); - - // at this point we should handle the import of the updates, but since the update logic is not handled in - // meilisearch we are just going to ignore this part - - Ok(()) -} diff --git a/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs b/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs new file mode 100644 index 000000000..eddd8a3b7 --- /dev/null +++ b/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs @@ -0,0 +1,59 @@ +use std::path::Path; + +use chrono::{DateTime, Utc}; +use log::info; +use serde::{Deserialize, Serialize}; + +use crate::index::Index; +use crate::index_controller::{update_actor::UpdateStore, uuid_resolver::HeedUuidStore}; +use crate::option::IndexerOpts; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct MetadataV2 { + db_version: String, + index_db_size: usize, + update_db_size: usize, + dump_date: DateTime, +} + +impl MetadataV2 { + pub fn new(index_db_size: usize, update_db_size: usize) -> Self { + Self { + db_version: env!("CARGO_PKG_VERSION").to_string(), + index_db_size, + update_db_size, + dump_date: Utc::now(), + } + } + + pub fn load_dump( + self, + src: impl AsRef, + dst: impl AsRef, + index_db_size: usize, + update_db_size: usize, + indexing_options: &IndexerOpts, + ) -> anyhow::Result<()> { + info!( + "Loading dump from {}, dump database version: {}, dump version: V2", + self.dump_date, self.db_version + ); + + info!("Loading index database."); + HeedUuidStore::load_dump(src.as_ref(), &dst)?; + + info!("Loading updates."); + UpdateStore::load_dump(&src, &dst, update_db_size)?; + + info!("Loading indexes."); + let indexes_path = src.as_ref().join("indexes"); + let indexes = indexes_path.read_dir()?; + for index in indexes { + let index = index?; + Index::load_dump(&index.path(), &dst, index_db_size, indexing_options)?; + } + + Ok(()) + } +} diff --git a/meilisearch-http/src/index_controller/dump_actor/message.rs b/meilisearch-http/src/index_controller/dump_actor/message.rs index 14409afbb..dff9f5954 100644 --- a/meilisearch-http/src/index_controller/dump_actor/message.rs +++ b/meilisearch-http/src/index_controller/dump_actor/message.rs @@ -1,7 +1,6 @@ use tokio::sync::oneshot; -use super::{DumpResult, DumpInfo}; - +use super::{DumpInfo, DumpResult}; pub enum DumpMsg { CreateDump { @@ -12,4 +11,3 @@ pub enum DumpMsg { ret: oneshot::Sender>, }, } - diff --git a/meilisearch-http/src/index_controller/dump_actor/mod.rs b/meilisearch-http/src/index_controller/dump_actor/mod.rs index 1508f8eb7..66f081e87 100644 --- a/meilisearch-http/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-http/src/index_controller/dump_actor/mod.rs @@ -1,31 +1,32 @@ -mod actor; -mod handle_impl; -mod message; -mod v1; -mod v2; +use std::fs::File; +use std::path::{Path, PathBuf}; -use std::{fs::File, path::Path, sync::Arc}; - -use anyhow::bail; -use heed::EnvOpenOptions; -use log::{error, info}; -use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; +use anyhow::Context; +use chrono::{DateTime, Utc}; +use log::{error, info, warn}; #[cfg(test)] use mockall::automock; use serde::{Deserialize, Serialize}; -use tempfile::TempDir; use thiserror::Error; -use uuid::Uuid; +use tokio::fs::create_dir_all; -use super::IndexMetadata; -use crate::helpers::compression; -use crate::index::Index; -use crate::index_controller::uuid_resolver; +use loaders::v1::MetadataV1; +use loaders::v2::MetadataV2; pub use actor::DumpActor; pub use handle_impl::*; pub use message::DumpMsg; +use super::{update_actor::UpdateActorHandle, uuid_resolver::UuidResolverHandle}; +use crate::{helpers::compression, option::IndexerOpts}; + +mod actor; +mod handle_impl; +mod loaders; +mod message; + +const META_FILE_NAME: &str = "metadata.json"; + pub type DumpResult = std::result::Result; #[derive(Error, Debug)] @@ -40,31 +41,6 @@ pub enum DumpError { DumpDoesNotExist(String), } -#[derive(Debug, Serialize, Deserialize, Copy, Clone)] -enum DumpVersion { - V1, - V2, -} - -impl DumpVersion { - const CURRENT: Self = Self::V2; - - /// Select the good importation function from the `DumpVersion` of metadata - pub fn import_index( - self, - size: usize, - uuid: Uuid, - dump_path: &Path, - db_path: &Path, - primary_key: Option<&str>, - ) -> anyhow::Result<()> { - match self { - Self::V1 => v1::import_index(size, uuid, dump_path, db_path, primary_key), - Self::V2 => v2::import_index(size, uuid, dump_path, db_path, primary_key), - } - } -} - #[async_trait::async_trait] #[cfg_attr(test, automock)] pub trait DumpActorHandle { @@ -78,39 +54,16 @@ pub trait DumpActorHandle { } #[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Metadata { - indexes: Vec, - db_version: String, - dump_version: DumpVersion, +#[serde(tag = "dumpVersion")] +pub enum Metadata { + V1(MetadataV1), + V2(MetadataV2), } impl Metadata { - /// Create a Metadata with the current dump version of meilisearch. - pub fn new(indexes: Vec, db_version: String) -> Self { - Metadata { - indexes, - db_version, - dump_version: DumpVersion::CURRENT, - } - } - - /// Extract Metadata from `metadata.json` file present at provided `dir_path` - fn from_path(dir_path: &Path) -> anyhow::Result { - let path = dir_path.join("metadata.json"); - let file = File::open(path)?; - let reader = std::io::BufReader::new(file); - let metadata = serde_json::from_reader(reader)?; - - Ok(metadata) - } - - /// Write Metadata in `metadata.json` file at provided `dir_path` - pub async fn to_path(&self, dir_path: &Path) -> anyhow::Result<()> { - let path = dir_path.join("metadata.json"); - tokio::fs::write(path, serde_json::to_string(self)?).await?; - - Ok(()) + pub fn new_v2(index_db_size: usize, update_db_size: usize) -> Self { + let meta = MetadataV2::new(index_db_size, update_db_size); + Self::V2(meta) } } @@ -129,6 +82,9 @@ pub struct DumpInfo { pub status: DumpStatus, #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, + started_at: DateTime, + #[serde(skip_serializing_if = "Option::is_none")] + finished_at: Option>, } impl DumpInfo { @@ -137,15 +93,19 @@ impl DumpInfo { uid, status, error: None, + started_at: Utc::now(), + finished_at: None, } } pub fn with_error(&mut self, error: String) { self.status = DumpStatus::Failed; + self.finished_at = Some(Utc::now()); self.error = Some(error); } pub fn done(&mut self) { + self.finished_at = Some(Utc::now()); self.status = DumpStatus::Done; } @@ -155,80 +115,100 @@ impl DumpInfo { } pub fn load_dump( - db_path: impl AsRef, - dump_path: impl AsRef, - size: usize, + dst_path: impl AsRef, + src_path: impl AsRef, + index_db_size: usize, + update_db_size: usize, + indexer_opts: &IndexerOpts, ) -> anyhow::Result<()> { - info!("Importing dump from {}...", dump_path.as_ref().display()); - let db_path = db_path.as_ref(); - let dump_path = dump_path.as_ref(); - let uuid_resolver = uuid_resolver::HeedUuidStore::new(&db_path)?; + let tmp_src = tempfile::tempdir_in(".")?; + let tmp_src_path = tmp_src.path(); - // extract the dump in a temporary directory - let tmp_dir = TempDir::new_in(db_path)?; - let tmp_dir_path = tmp_dir.path(); - compression::from_tar_gz(dump_path, tmp_dir_path)?; + compression::from_tar_gz(&src_path, tmp_src_path)?; - // read dump metadata - let metadata = Metadata::from_path(&tmp_dir_path)?; + let meta_path = tmp_src_path.join(META_FILE_NAME); + let mut meta_file = File::open(&meta_path)?; + let meta: Metadata = serde_json::from_reader(&mut meta_file)?; - // remove indexes which have same `uuid` than indexes to import and create empty indexes - let existing_index_uids = uuid_resolver.list()?; + let dst_dir = dst_path + .as_ref() + .parent() + .with_context(|| format!("Invalid db path: {}", dst_path.as_ref().display()))?; - info!("Deleting indexes already present in the db and provided in the dump..."); - for idx in &metadata.indexes { - if let Some((_, uuid)) = existing_index_uids.iter().find(|(s, _)| s == &idx.uid) { - // if we find the index in the `uuid_resolver` it's supposed to exist on the file system - // and we want to delete it - let path = db_path.join(&format!("indexes/index-{}", uuid)); - info!("Deleting {}", path.display()); - use std::io::ErrorKind::*; - match std::fs::remove_dir_all(path) { - Ok(()) => (), - // if an index was present in the metadata but missing of the fs we can ignore the - // problem because we are going to create it later - Err(e) if e.kind() == NotFound => (), - Err(e) => bail!(e), - } - } else { - // if the index does not exist in the `uuid_resolver` we create it - uuid_resolver.create_uuid(idx.uid.clone(), false)?; + let tmp_dst = tempfile::tempdir_in(dst_dir)?; + + match meta { + Metadata::V1(meta) => { + meta.load_dump(&tmp_src_path, tmp_dst.path(), index_db_size, indexer_opts)? } + Metadata::V2(meta) => meta.load_dump( + &tmp_src_path, + tmp_dst.path(), + index_db_size, + update_db_size, + indexer_opts, + )?, + } + // Persist and atomically rename the db + let persisted_dump = tmp_dst.into_path(); + if dst_path.as_ref().exists() { + warn!("Overwriting database at {}", dst_path.as_ref().display()); + std::fs::remove_dir_all(&dst_path)?; } - // import each indexes content - for idx in metadata.indexes { - let dump_path = tmp_dir_path.join(&idx.uid); - // this cannot fail since we created all the missing uuid in the previous loop - let uuid = uuid_resolver.get_uuid(idx.uid)?.unwrap(); + std::fs::rename(&persisted_dump, &dst_path)?; - info!( - "Importing dump from {} into {}...", - dump_path.display(), - db_path.display() - ); - metadata.dump_version.import_index( - size, - uuid, - &dump_path, - &db_path, - idx.meta.primary_key.as_ref().map(|s| s.as_ref()), - )?; - info!("Dump importation from {} succeed", dump_path.display()); - } - - // finally we can move all the unprocessed update file into our new DB - // this directory may not exists - let update_path = tmp_dir_path.join("update_files"); - let db_update_path = db_path.join("updates/update_files"); - if update_path.exists() { - let _ = std::fs::remove_dir_all(db_update_path); - std::fs::rename( - tmp_dir_path.join("update_files"), - db_path.join("updates/update_files"), - )?; - } - - info!("Dump importation from {} succeed", dump_path.display()); Ok(()) } + +struct DumpTask { + path: PathBuf, + uuid_resolver: U, + update_handle: P, + uid: String, + update_db_size: usize, + index_db_size: usize, +} + +impl DumpTask +where + U: UuidResolverHandle + Send + Sync + Clone + 'static, + P: UpdateActorHandle + Send + Sync + Clone + 'static, +{ + async fn run(self) -> anyhow::Result<()> { + info!("Performing dump."); + + create_dir_all(&self.path).await?; + + let path_clone = self.path.clone(); + let temp_dump_dir = + tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(path_clone)).await??; + let temp_dump_path = temp_dump_dir.path().to_owned(); + + let meta = Metadata::new_v2(self.index_db_size, self.update_db_size); + let meta_path = temp_dump_path.join(META_FILE_NAME); + let mut meta_file = File::create(&meta_path)?; + serde_json::to_writer(&mut meta_file, &meta)?; + + let uuids = self.uuid_resolver.dump(temp_dump_path.clone()).await?; + + self.update_handle + .dump(uuids, temp_dump_path.clone()) + .await?; + + let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result { + let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?; + compression::to_tar_gz(temp_dump_path, temp_dump_file.path())?; + + let dump_path = self.path.join(self.uid).with_extension("dump"); + temp_dump_file.persist(&dump_path)?; + + Ok(dump_path) + }) + .await??; + + info!("Created dump in {:?}.", dump_path); + + Ok(()) + } +} diff --git a/meilisearch-http/src/index_controller/dump_actor/v2.rs b/meilisearch-http/src/index_controller/dump_actor/v2.rs deleted file mode 100644 index eeda78e8a..000000000 --- a/meilisearch-http/src/index_controller/dump_actor/v2.rs +++ /dev/null @@ -1,89 +0,0 @@ -use heed::EnvOpenOptions; -use log::info; -use uuid::Uuid; -use crate::{index::Unchecked, index_controller::{UpdateStatus, update_actor::UpdateStore}}; -use std::io::BufRead; -use milli::{update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}}; -use crate::index::{Checked, Index}; -use crate::index_controller::Settings; -use std::{fs::File, path::Path, sync::Arc}; - -/// Extract Settings from `settings.json` file present at provided `dir_path` -fn import_settings(dir_path: &Path) -> anyhow::Result> { - let path = dir_path.join("settings.json"); - let file = File::open(path)?; - let reader = std::io::BufReader::new(file); - let metadata: Settings = serde_json::from_reader(reader)?; - - println!("Meta: {:?}", metadata); - - Ok(metadata.check()) -} - -pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> { - let index_path = db_path.join(&format!("indexes/index-{}", uuid)); - std::fs::create_dir_all(&index_path)?; - let mut options = EnvOpenOptions::new(); - options.map_size(size); - let index = milli::Index::new(options, index_path)?; - let index = Index(Arc::new(index)); - - let mut txn = index.write_txn()?; - - info!("importing the settings..."); - // extract `settings.json` file and import content - let settings = import_settings(&dump_path)?; - let update_builder = UpdateBuilder::new(0); - index.update_settings_txn(&mut txn, &settings, update_builder)?; - - // import the documents in the index - let update_builder = UpdateBuilder::new(1); - let file = File::open(&dump_path.join("documents.jsonl"))?; - let reader = std::io::BufReader::new(file); - - info!("importing the documents..."); - // TODO: TAMO: currently we ignore any error caused by the importation of the documents because - // if there is no documents nor primary key it'll throw an anyhow error, but we must remove - // this before the merge on main - index.update_documents_txn( - &mut txn, - UpdateFormat::JsonStream, - IndexDocumentsMethod::ReplaceDocuments, - Some(reader), - update_builder, - primary_key, - )?; - - txn.commit()?; - - // the last step: we extract the original milli::Index and close it - Arc::try_unwrap(index.0) - .map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index") - .unwrap() - .prepare_for_closing() - .wait(); - - info!("importing the updates..."); - import_updates(uuid, dump_path, db_path) -} - -fn import_updates(uuid: Uuid, dump_path: &Path, db_path: &Path) -> anyhow::Result<()> { - let update_path = db_path.join("updates"); - let options = EnvOpenOptions::new(); - // create an UpdateStore to import the updates - std::fs::create_dir_all(&update_path)?; - let (update_store, _) = UpdateStore::create(options, &update_path)?; - let file = File::open(&dump_path.join("updates.jsonl"))?; - let reader = std::io::BufReader::new(file); - - let mut wtxn = update_store.env.write_txn()?; - for update in reader.lines() { - let mut update: UpdateStatus = serde_json::from_str(&update?)?; - if let Some(path) = update.content_path_mut() { - *path = update_path.join("update_files").join(&path); - } - update_store.register_raw_updates(&mut wtxn, update, uuid)?; - } - wtxn.commit()?; - Ok(()) -} diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index 1f0091265..31e2a58d4 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -6,14 +6,15 @@ use async_stream::stream; use futures::stream::StreamExt; use heed::CompactionOption; use log::debug; -use tokio::sync::mpsc; use tokio::task::spawn_blocking; +use tokio::{fs, sync::mpsc}; use uuid::Uuid; -use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; +use crate::index::{ + update_handler::UpdateHandler, Checked, Document, SearchQuery, SearchResult, Settings, +}; use crate::index_controller::{ - get_arc_ownership_blocking, update_handler::UpdateHandler, Failed, IndexStats, Processed, - Processing, + get_arc_ownership_blocking, Failed, IndexStats, Processed, Processing, }; use crate::option::IndexerOpts; @@ -30,10 +31,14 @@ pub struct IndexActor { impl IndexActor { pub fn new(receiver: mpsc::Receiver, store: S) -> IndexResult { let options = IndexerOpts::default(); - let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?; + let update_handler = UpdateHandler::new(&options)?; let update_handler = Arc::new(update_handler); let receiver = Some(receiver); - Ok(Self { receiver, update_handler, store }) + Ok(Self { + receiver, + update_handler, + store, + }) } /// `run` poll the write_receiver and read_receiver concurrently, but while messages send @@ -122,8 +127,8 @@ impl IndexActor { Snapshot { uuid, path, ret } => { let _ = ret.send(self.handle_snapshot(uuid, path).await); } - Dump { uid, uuid, path, ret } => { - let _ = ret.send(self.handle_dump(&uid, uuid, path).await); + Dump { uuid, path, ret } => { + let _ = ret.send(self.handle_dump(uuid, path).await); } GetStats { uuid, ret } => { let _ = ret.send(self.handle_get_stats(uuid).await); @@ -146,9 +151,7 @@ impl IndexActor { primary_key: Option, ) -> IndexResult { let index = self.store.create(uuid, primary_key).await?; - let meta = spawn_blocking(move || IndexMeta::new(&index)) - .await - .map_err(|e| IndexError::Error(e.into()))??; + let meta = spawn_blocking(move || IndexMeta::new(&index)).await??; Ok(meta) } @@ -165,9 +168,9 @@ impl IndexActor { None => self.store.create(uuid, None).await?, }; - spawn_blocking(move || update_handler.handle_update(meta, data, index)) - .await - .map_err(|e| IndexError::Error(e.into())) + let result = + spawn_blocking(move || update_handler.handle_update(meta, data, index)).await?; + Ok(result) } async fn handle_settings(&self, uuid: Uuid) -> IndexResult> { @@ -176,9 +179,8 @@ impl IndexActor { .get(uuid) .await? .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || index.settings().map_err(IndexError::Error)) - .await - .map_err(|e| IndexError::Error(e.into()))? + let result = spawn_blocking(move || index.settings()).await??; + Ok(result) } async fn handle_fetch_documents( @@ -193,13 +195,11 @@ impl IndexActor { .get(uuid) .await? .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || { - index - .retrieve_documents(offset, limit, attributes_to_retrieve) - .map_err(IndexError::Error) - }) - .await - .map_err(|e| IndexError::Error(e.into()))? + let result = + spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve)) + .await??; + + Ok(result) } async fn handle_fetch_document( @@ -213,13 +213,12 @@ impl IndexActor { .get(uuid) .await? .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || { - index - .retrieve_document(doc_id, attributes_to_retrieve) - .map_err(IndexError::Error) - }) - .await - .map_err(|e| IndexError::Error(e.into()))? + + let result = + spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve)) + .await??; + + Ok(result) } async fn handle_delete(&self, uuid: Uuid) -> IndexResult<()> { @@ -242,9 +241,7 @@ impl IndexActor { async fn handle_get_meta(&self, uuid: Uuid) -> IndexResult { match self.store.get(uuid).await? { Some(index) => { - let meta = spawn_blocking(move || IndexMeta::new(&index)) - .await - .map_err(|e| IndexError::Error(e.into()))??; + let meta = spawn_blocking(move || IndexMeta::new(&index)).await??; Ok(meta) } None => Err(IndexError::UnexistingIndex), @@ -262,7 +259,7 @@ impl IndexActor { .await? .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || match index_settings.primary_key { + let result = spawn_blocking(move || match index_settings.primary_key { Some(ref primary_key) => { let mut txn = index.write_txn()?; if index.primary_key(&txn)?.is_some() { @@ -278,23 +275,22 @@ impl IndexActor { Ok(meta) } }) - .await - .map_err(|e| IndexError::Error(e.into()))? + .await??; + + Ok(result) } async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> IndexResult<()> { use tokio::fs::create_dir_all; path.push("indexes"); - create_dir_all(&path) - .await - .map_err(|e| IndexError::Error(e.into()))?; + create_dir_all(&path).await?; if let Some(index) = self.store.get(uuid).await? { let mut index_path = path.join(format!("index-{}", uuid)); - create_dir_all(&index_path) - .await - .map_err(|e| IndexError::Error(e.into()))?; + + create_dir_all(&index_path).await?; + index_path.push("data.mdb"); spawn_blocking(move || -> anyhow::Result<()> { // Get write txn to wait for ongoing write transaction before snapshot. @@ -304,9 +300,7 @@ impl IndexActor { .copy_to_path(index_path, CompactionOption::Enabled)?; Ok(()) }) - .await - .map_err(|e| IndexError::Error(e.into()))? - .map_err(IndexError::Error)?; + .await??; } Ok(()) @@ -314,50 +308,17 @@ impl IndexActor { /// Create a `documents.jsonl` and a `settings.json` in `path/uid/` with a dump of all the /// documents and all the settings. - async fn handle_dump(&self, uid: &str, uuid: Uuid, path: PathBuf) -> IndexResult<()> { - use std::io::prelude::*; - use tokio::fs::create_dir_all; + async fn handle_dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { + let index = self + .store + .get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; - create_dir_all(&path) - .await - .map_err(|e| IndexError::Error(e.into()))?; + let path = path.join(format!("indexes/index-{}/", uuid)); + fs::create_dir_all(&path).await?; - if let Some(index) = self.store.get(uuid).await? { - let documents_path = path.join(uid).join("documents.jsonl"); - let settings_path = path.join(uid).join("settings.json"); - - spawn_blocking(move || -> anyhow::Result<()> { - // first we dump all the documents - let file = File::create(documents_path)?; - let mut file = std::io::BufWriter::new(file); - - // Get write txn to wait for ongoing write transaction before dump. - let txn = index.write_txn()?; - let fields_ids_map = index.fields_ids_map(&txn)?; - // we want to save **all** the fields in the dump. - let fields_to_dump: Vec = fields_ids_map.iter().map(|(id, _)| id).collect(); - - for document in index.all_documents(&txn)? { - let (_doc_id, document) = document?; - let json = milli::obkv_to_json(&fields_to_dump, &fields_ids_map, document)?; - file.write_all(serde_json::to_string(&json)?.as_bytes())?; - file.write_all(b"\n")?; - } - - // then we dump all the settings - let file = File::create(settings_path)?; - let mut file = std::io::BufWriter::new(file); - let settings = index.settings()?; - - file.write_all(serde_json::to_string(&settings)?.as_bytes())?; - file.write_all(b"\n")?; - - Ok(()) - }) - .await - .map_err(|e| IndexError::Error(e.into()))? - .map_err(IndexError::Error)?; - } + tokio::task::spawn_blocking(move || index.dump(path)).await??; Ok(()) } @@ -379,7 +340,6 @@ impl IndexActor { fields_distribution: index.fields_distribution(&rtxn)?, }) }) - .await - .map_err(|e| IndexError::Error(e.into()))? + .await? } } diff --git a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs index 64b63e5f0..6bf83c647 100644 --- a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs @@ -3,7 +3,10 @@ use std::path::{Path, PathBuf}; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; -use crate::{index::Checked, index_controller::{IndexSettings, IndexStats, Processing}}; +use crate::{ + index::Checked, + index_controller::{IndexSettings, IndexStats, Processing}, +}; use crate::{ index::{Document, SearchQuery, SearchResult, Settings}, index_controller::{Failed, Processed}, @@ -136,9 +139,9 @@ impl IndexActorHandle for IndexActorHandleImpl { Ok(receiver.await.expect("IndexActor has been killed")?) } - async fn dump(&self, uid: String, uuid: Uuid, path: PathBuf) -> IndexResult<()> { + async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Dump { uid, uuid, path, ret }; + let msg = IndexMsg::Dump { uuid, path, ret }; let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } diff --git a/meilisearch-http/src/index_controller/index_actor/message.rs b/meilisearch-http/src/index_controller/index_actor/message.rs index 37faa1e31..377b2c333 100644 --- a/meilisearch-http/src/index_controller/index_actor/message.rs +++ b/meilisearch-http/src/index_controller/index_actor/message.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use tokio::sync::oneshot; use uuid::Uuid; -use crate::index::{Document, SearchQuery, SearchResult, Settings, Checked}; +use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::index_controller::{Failed, IndexStats, Processed, Processing}; use super::{IndexMeta, IndexResult, IndexSettings}; @@ -61,7 +61,6 @@ pub enum IndexMsg { ret: oneshot::Sender>, }, Dump { - uid: String, uuid: Uuid, path: PathBuf, ret: oneshot::Sender>, diff --git a/meilisearch-http/src/index_controller/index_actor/mod.rs b/meilisearch-http/src/index_controller/index_actor/mod.rs index 3b92b1078..1ddc0199e 100644 --- a/meilisearch-http/src/index_controller/index_actor/mod.rs +++ b/meilisearch-http/src/index_controller/index_actor/mod.rs @@ -15,7 +15,7 @@ use message::IndexMsg; use store::{IndexStore, MapIndexStore}; use crate::index::{Checked, Document, Index, SearchQuery, SearchResult, Settings}; -use crate::index_controller::{Failed, Processed, Processing, IndexStats}; +use crate::index_controller::{Failed, IndexStats, Processed, Processing}; use super::IndexSettings; @@ -44,24 +44,45 @@ impl IndexMeta { let created_at = index.created_at(&txn)?; let updated_at = index.updated_at(&txn)?; let primary_key = index.primary_key(&txn)?.map(String::from); - Ok(Self { created_at, updated_at, primary_key }) + Ok(Self { + created_at, + updated_at, + primary_key, + }) } } #[derive(Error, Debug)] pub enum IndexError { - #[error("error with index: {0}")] - Error(#[from] anyhow::Error), #[error("index already exists")] IndexAlreadyExists, #[error("Index doesn't exists")] UnexistingIndex, - #[error("Heed error: {0}")] - HeedError(#[from] heed::Error), #[error("Existing primary key")] ExistingPrimaryKey, + #[error("Internal Index Error: {0}")] + Internal(String), } +macro_rules! internal_error { + ($($other:path), *) => { + $( + impl From<$other> for IndexError { + fn from(other: $other) -> Self { + Self::Internal(other.to_string()) + } + } + )* + } +} + +internal_error!( + anyhow::Error, + heed::Error, + tokio::task::JoinError, + std::io::Error +); + #[async_trait::async_trait] #[cfg_attr(test, automock)] pub trait IndexActorHandle { @@ -97,7 +118,7 @@ pub trait IndexActorHandle { index_settings: IndexSettings, ) -> IndexResult; async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>; - async fn dump(&self, uid: String, uuid: Uuid, path: PathBuf) -> IndexResult<()>; + async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>; async fn get_index_stats(&self, uuid: Uuid) -> IndexResult; } @@ -178,8 +199,8 @@ mod test { self.as_ref().snapshot(uuid, path).await } - async fn dump(&self, uid: String, uuid: Uuid, path: PathBuf) -> IndexResult<()> { - self.as_ref().dump(uid, uuid, path).await + async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> { + self.as_ref().dump(uuid, path).await } async fn get_index_stats(&self, uuid: Uuid) -> IndexResult { diff --git a/meilisearch-http/src/index_controller/index_actor/store.rs b/meilisearch-http/src/index_controller/index_actor/store.rs index 44f076f2f..11791be48 100644 --- a/meilisearch-http/src/index_controller/index_actor/store.rs +++ b/meilisearch-http/src/index_controller/index_actor/store.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; -use heed::EnvOpenOptions; use tokio::fs; use tokio::sync::RwLock; use tokio::task::spawn_blocking; @@ -48,7 +47,7 @@ impl IndexStore for MapIndexStore { let index_size = self.index_size; let index = spawn_blocking(move || -> IndexResult { - let index = open_index(&path, index_size)?; + let index = Index::open(path, index_size)?; if let Some(primary_key) = primary_key { let mut txn = index.write_txn()?; index.put_primary_key(&mut txn, &primary_key)?; @@ -56,8 +55,7 @@ impl IndexStore for MapIndexStore { } Ok(index) }) - .await - .map_err(|e| IndexError::Error(e.into()))??; + .await??; self.index_store.write().await.insert(uuid, index.clone()); @@ -77,9 +75,7 @@ impl IndexStore for MapIndexStore { } let index_size = self.index_size; - let index = spawn_blocking(move || open_index(path, index_size)) - .await - .map_err(|e| IndexError::Error(e.into()))??; + let index = spawn_blocking(move || Index::open(path, index_size)).await??; self.index_store.write().await.insert(uuid, index.clone()); Ok(Some(index)) } @@ -88,18 +84,8 @@ impl IndexStore for MapIndexStore { async fn delete(&self, uuid: Uuid) -> IndexResult> { let db_path = self.path.join(format!("index-{}", uuid)); - fs::remove_dir_all(db_path) - .await - .map_err(|e| IndexError::Error(e.into()))?; + fs::remove_dir_all(db_path).await?; let index = self.index_store.write().await.remove(&uuid); Ok(index) } } - -fn open_index(path: impl AsRef, size: usize) -> IndexResult { - std::fs::create_dir_all(&path).map_err(|e| IndexError::Error(e.into()))?; - let mut options = EnvOpenOptions::new(); - options.map_size(size); - let index = milli::Index::new(options, &path).map_err(IndexError::Error)?; - Ok(Index(Arc::new(index))) -} diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index d1bb5e170..0615bb731 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -14,24 +14,23 @@ use tokio::sync::mpsc; use tokio::time::sleep; use uuid::Uuid; -pub use updates::*; -pub use dump_actor::{DumpInfo, DumpStatus}; use dump_actor::DumpActorHandle; +pub use dump_actor::{DumpInfo, DumpStatus}; use index_actor::IndexActorHandle; -use snapshot::{SnapshotService, load_snapshot}; +use snapshot::{load_snapshot, SnapshotService}; use update_actor::UpdateActorHandle; -use uuid_resolver::{UuidError, UuidResolverHandle}; +pub use updates::*; +use uuid_resolver::{UuidResolverError, UuidResolverHandle}; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::option::Opt; -use dump_actor::load_dump; +use self::dump_actor::load_dump; +mod dump_actor; mod index_actor; mod snapshot; -mod dump_actor; mod update_actor; -mod update_handler; mod updates; mod uuid_resolver; @@ -94,13 +93,14 @@ impl IndexController { options.ignore_snapshot_if_db_exists, options.ignore_missing_snapshot, )?; - } else if let Some(ref path) = options.import_dump { + } else if let Some(ref src_path) = options.import_dump { load_dump( &options.db_path, - path, - index_size, + src_path, + options.max_mdb_size.get_bytes() as usize, + options.max_udb_size.get_bytes() as usize, + &options.indexer_options, )?; - } std::fs::create_dir_all(&path)?; @@ -112,7 +112,13 @@ impl IndexController { &path, update_store_size, )?; - let dump_handle = dump_actor::DumpActorHandleImpl::new(&options.dumps_dir, uuid_resolver.clone(), index_handle.clone(), update_handle.clone())?; + let dump_handle = dump_actor::DumpActorHandleImpl::new( + &options.dumps_dir, + uuid_resolver.clone(), + update_handle.clone(), + options.max_mdb_size.get_bytes() as usize, + options.max_udb_size.get_bytes() as usize, + )?; if options.schedule_snapshot { let snapshot_service = SnapshotService::new( @@ -159,11 +165,6 @@ impl IndexController { // registered and the update_actor that waits for the the payload to be sent to it. tokio::task::spawn_local(async move { payload - .map(|bytes| { - bytes.map_err(|e| { - Box::new(e) as Box - }) - }) .for_each(|r| async { let _ = sender.send(r).await; }) @@ -176,7 +177,7 @@ impl IndexController { match self.uuid_resolver.get(uid).await { Ok(uuid) => Ok(perform_update(uuid).await?), - Err(UuidError::UnexistingIndex(name)) => { + Err(UuidResolverError::UnexistingIndex(name)) => { let uuid = Uuid::new_v4(); let status = perform_update(uuid).await?; // ignore if index creation fails now, since it may already have been created @@ -230,7 +231,7 @@ impl IndexController { match self.uuid_resolver.get(uid).await { Ok(uuid) => Ok(perform_udpate(uuid).await?), - Err(UuidError::UnexistingIndex(name)) if create => { + Err(UuidResolverError::UnexistingIndex(name)) if create => { let uuid = Uuid::new_v4(); let status = perform_udpate(uuid).await?; // ignore if index creation fails now, since it may already have been created diff --git a/meilisearch-http/src/index_controller/snapshot.rs b/meilisearch-http/src/index_controller/snapshot.rs index 2a456eb26..daef7d582 100644 --- a/meilisearch-http/src/index_controller/snapshot.rs +++ b/meilisearch-http/src/index_controller/snapshot.rs @@ -144,7 +144,7 @@ mod test { use crate::index_controller::update_actor::{ MockUpdateActorHandle, UpdateActorHandleImpl, UpdateError, }; - use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidError}; + use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidResolverError}; #[actix_rt::test] async fn test_normal() { @@ -193,7 +193,7 @@ mod test { .expect_snapshot() .times(1) // abitrary error - .returning(|_| Box::pin(err(UuidError::NameAlreadyExist))); + .returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist))); let update_handle = MockUpdateActorHandle::new(); @@ -248,7 +248,7 @@ mod test { // we expect the funtion to be called between 2 and 3 time in the given interval. .times(2..4) // abitrary error, to short-circuit the function - .returning(move |_| Box::pin(err(UuidError::NameAlreadyExist))); + .returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist))); let update_handle = MockUpdateActorHandle::new(); diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index f576ce7a8..7779f2556 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -11,7 +11,7 @@ use tokio::sync::mpsc; use uuid::Uuid; use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo}; -use crate::index_controller::index_actor::{IndexActorHandle}; +use crate::index_controller::index_actor::IndexActorHandle; use crate::index_controller::{UpdateMeta, UpdateStatus}; pub struct UpdateActor { @@ -42,7 +42,12 @@ where let store = UpdateStore::open(options, &path, index_handle.clone())?; std::fs::create_dir_all(path.join("update_files"))?; assert!(path.exists()); - Ok(Self { path, store, inbox, index_handle }) + Ok(Self { + path, + store, + inbox, + index_handle, + }) } pub async fn run(mut self) { @@ -90,9 +95,7 @@ where mut payload: mpsc::Receiver>, ) -> Result { let file_path = match meta { - UpdateMeta::DocumentsAddition { .. } - | UpdateMeta::DeleteDocuments => { - + UpdateMeta::DocumentsAddition { .. } | UpdateMeta::DeleteDocuments => { let update_file_id = uuid::Uuid::new_v4(); let path = self .path @@ -102,39 +105,26 @@ where .write(true) .create(true) .open(&path) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; + .await?; let mut file_len = 0; while let Some(bytes) = payload.recv().await { - match bytes { - Ok(bytes) => { - file_len += bytes.as_ref().len(); - file.write_all(bytes.as_ref()) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - } - Err(e) => { - return Err(UpdateError::Error(e)); - } - } + let bytes = bytes?; + file_len += bytes.as_ref().len(); + file.write_all(bytes.as_ref()).await?; } if file_len != 0 { - file.flush() - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; + file.flush().await?; let file = file.into_std().await; - Some((file, path)) + Some((file, update_file_id)) } else { // empty update, delete the empty file. - fs::remove_file(&path) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; + fs::remove_file(&path).await?; None } } - _ => None + _ => None, }; let update_store = self.store.clone(); @@ -143,52 +133,45 @@ where use std::io::{copy, sink, BufReader, Seek}; // If the payload is empty, ignore the check. - let path = if let Some((mut file, path)) = file_path { + let update_uuid = if let Some((mut file, uuid)) = file_path { // set the file back to the beginning - file.seek(SeekFrom::Start(0)).map_err(|e| UpdateError::Error(Box::new(e)))?; + file.seek(SeekFrom::Start(0))?; // Check that the json payload is valid: let reader = BufReader::new(&mut file); let mut checker = JsonChecker::new(reader); if copy(&mut checker, &mut sink()).is_err() || checker.finish().is_err() { // The json file is invalid, we use Serde to get a nice error message: - file.seek(SeekFrom::Start(0)) - .map_err(|e| UpdateError::Error(Box::new(e)))?; - let _: serde_json::Value = serde_json::from_reader(file) - .map_err(|e| UpdateError::Error(Box::new(e)))?; + file.seek(SeekFrom::Start(0))?; + let _: serde_json::Value = serde_json::from_reader(file)?; } - Some(path) + Some(uuid) } else { None }; // The payload is valid, we can register it to the update store. - update_store - .register_update(meta, path, uuid) - .map(UpdateStatus::Enqueued) - .map_err(|e| UpdateError::Error(Box::new(e))) + let status = update_store + .register_update(meta, update_uuid, uuid) + .map(UpdateStatus::Enqueued)?; + Ok(status) }) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))? + .await? } async fn handle_list_updates(&self, uuid: Uuid) -> Result> { let update_store = self.store.clone(); tokio::task::spawn_blocking(move || { - let result = update_store - .list(uuid) - .map_err(|e| UpdateError::Error(e.into()))?; + let result = update_store.list(uuid)?; Ok(result) }) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))? + .await? } async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result { let store = self.store.clone(); let result = store - .meta(uuid, id) - .map_err(|e| UpdateError::Error(Box::new(e)))? + .meta(uuid, id)? .ok_or(UpdateError::UnexistingUpdate(id))?; Ok(result) } @@ -196,10 +179,7 @@ where async fn handle_delete(&self, uuid: Uuid) -> Result<()> { let store = self.store.clone(); - tokio::task::spawn_blocking(move || store.delete_all(uuid)) - .await - .map_err(|e| UpdateError::Error(e.into()))? - .map_err(|e| UpdateError::Error(e.into()))?; + tokio::task::spawn_blocking(move || store.delete_all(uuid)).await??; Ok(()) } @@ -208,24 +188,22 @@ where let index_handle = self.index_handle.clone(); let update_store = self.store.clone(); - tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle)) - .await - .map_err(|e| UpdateError::Error(e.into()))? - .map_err(|e| UpdateError::Error(e.into()))?; + tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle)) + .await??; Ok(()) } - async fn handle_dump(&self, uuids: HashSet<(String, Uuid)>, path: PathBuf) -> Result<()> { + async fn handle_dump(&self, uuids: HashSet, path: PathBuf) -> Result<()> { let index_handle = self.index_handle.clone(); let update_store = self.store.clone(); + tokio::task::spawn_blocking(move || -> anyhow::Result<()> { update_store.dump(&uuids, path.to_path_buf(), index_handle)?; Ok(()) }) - .await - .map_err(|e| UpdateError::Error(e.into()))? - .map_err(|e| UpdateError::Error(e.into()))?; + .await??; + Ok(()) } @@ -235,9 +213,7 @@ where let info = update_store.get_info()?; Ok(info) }) - .await - .map_err(|e| UpdateError::Error(e.into()))? - .map_err(|e| UpdateError::Error(e.into()))?; + .await??; Ok(info) } diff --git a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs index a497a3c5c..cc5ba9757 100644 --- a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs @@ -71,7 +71,7 @@ where receiver.await.expect("update actor killed.") } - async fn dump(&self, uuids: HashSet<(String, Uuid)>, path: PathBuf) -> Result<()> { + async fn dump(&self, uuids: HashSet, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Dump { uuids, path, ret }; let _ = self.sender.send(msg).await; diff --git a/meilisearch-http/src/index_controller/update_actor/message.rs b/meilisearch-http/src/index_controller/update_actor/message.rs index 4103ca121..37df2af32 100644 --- a/meilisearch-http/src/index_controller/update_actor/message.rs +++ b/meilisearch-http/src/index_controller/update_actor/message.rs @@ -32,7 +32,7 @@ pub enum UpdateMsg { ret: oneshot::Sender>, }, Dump { - uuids: HashSet<(String, Uuid)>, + uuids: HashSet, path: PathBuf, ret: oneshot::Sender>, }, diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs index 05b793e45..ba89eebe3 100644 --- a/meilisearch-http/src/index_controller/update_actor/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -1,10 +1,11 @@ mod actor; mod handle_impl; mod message; -mod update_store; +pub mod store; use std::{collections::HashSet, path::PathBuf}; +use actix_http::error::PayloadError; use thiserror::Error; use tokio::sync::mpsc; use uuid::Uuid; @@ -14,23 +15,44 @@ use crate::index_controller::{UpdateMeta, UpdateStatus}; use actor::UpdateActor; use message::UpdateMsg; -pub use update_store::{UpdateStore, UpdateStoreInfo}; pub use handle_impl::UpdateActorHandleImpl; +pub use store::{UpdateStore, UpdateStoreInfo}; pub type Result = std::result::Result; -type PayloadData = std::result::Result>; +type PayloadData = std::result::Result; #[cfg(test)] use mockall::automock; #[derive(Debug, Error)] pub enum UpdateError { - #[error("error with update: {0}")] - Error(Box), #[error("Update {0} doesn't exist.")] UnexistingUpdate(u64), + #[error("Internal error processing update: {0}")] + Internal(String), } +macro_rules! internal_error { + ($($other:path), *) => { + $( + impl From<$other> for UpdateError { + fn from(other: $other) -> Self { + Self::Internal(other.to_string()) + } + } + )* + } +} + +internal_error!( + heed::Error, + std::io::Error, + serde_json::Error, + PayloadError, + tokio::task::JoinError, + anyhow::Error +); + #[async_trait::async_trait] #[cfg_attr(test, automock(type Data=Vec;))] pub trait UpdateActorHandle { @@ -40,7 +62,7 @@ pub trait UpdateActorHandle { async fn update_status(&self, uuid: Uuid, id: u64) -> Result; async fn delete(&self, uuid: Uuid) -> Result<()>; async fn snapshot(&self, uuid: HashSet, path: PathBuf) -> Result<()>; - async fn dump(&self, uuid: HashSet<(String, Uuid)>, path: PathBuf) -> Result<()>; + async fn dump(&self, uuids: HashSet, path: PathBuf) -> Result<()>; async fn get_info(&self) -> Result; async fn update( &self, diff --git a/meilisearch-http/src/index_controller/update_actor/store/codec.rs b/meilisearch-http/src/index_controller/update_actor/store/codec.rs new file mode 100644 index 000000000..e07b52eec --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/store/codec.rs @@ -0,0 +1,86 @@ +use std::{borrow::Cow, convert::TryInto, mem::size_of}; + +use heed::{BytesDecode, BytesEncode}; +use uuid::Uuid; + +pub struct NextIdCodec; + +pub enum NextIdKey { + Global, + Index(Uuid), +} + +impl<'a> BytesEncode<'a> for NextIdCodec { + type EItem = NextIdKey; + + fn bytes_encode(item: &'a Self::EItem) -> Option> { + match item { + NextIdKey::Global => Some(Cow::Borrowed(b"__global__")), + NextIdKey::Index(ref uuid) => Some(Cow::Borrowed(uuid.as_bytes())), + } + } +} + +pub struct PendingKeyCodec; + +impl<'a> BytesEncode<'a> for PendingKeyCodec { + type EItem = (u64, Uuid, u64); + + fn bytes_encode((global_id, uuid, update_id): &'a Self::EItem) -> Option> { + let mut bytes = Vec::with_capacity(size_of::()); + bytes.extend_from_slice(&global_id.to_be_bytes()); + bytes.extend_from_slice(uuid.as_bytes()); + bytes.extend_from_slice(&update_id.to_be_bytes()); + Some(Cow::Owned(bytes)) + } +} + +impl<'a> BytesDecode<'a> for PendingKeyCodec { + type DItem = (u64, Uuid, u64); + + fn bytes_decode(bytes: &'a [u8]) -> Option { + let global_id_bytes = bytes.get(0..size_of::())?.try_into().ok()?; + let global_id = u64::from_be_bytes(global_id_bytes); + + let uuid_bytes = bytes + .get(size_of::()..(size_of::() + size_of::()))? + .try_into() + .ok()?; + let uuid = Uuid::from_bytes(uuid_bytes); + + let update_id_bytes = bytes + .get((size_of::() + size_of::())..)? + .try_into() + .ok()?; + let update_id = u64::from_be_bytes(update_id_bytes); + + Some((global_id, uuid, update_id)) + } +} + +pub struct UpdateKeyCodec; + +impl<'a> BytesEncode<'a> for UpdateKeyCodec { + type EItem = (Uuid, u64); + + fn bytes_encode((uuid, update_id): &'a Self::EItem) -> Option> { + let mut bytes = Vec::with_capacity(size_of::()); + bytes.extend_from_slice(uuid.as_bytes()); + bytes.extend_from_slice(&update_id.to_be_bytes()); + Some(Cow::Owned(bytes)) + } +} + +impl<'a> BytesDecode<'a> for UpdateKeyCodec { + type DItem = (Uuid, u64); + + fn bytes_decode(bytes: &'a [u8]) -> Option { + let uuid_bytes = bytes.get(0..size_of::())?.try_into().ok()?; + let uuid = Uuid::from_bytes(uuid_bytes); + + let update_id_bytes = bytes.get(size_of::()..)?.try_into().ok()?; + let update_id = u64::from_be_bytes(update_id_bytes); + + Some((uuid, update_id)) + } +} diff --git a/meilisearch-http/src/index_controller/update_actor/store/dump.rs b/meilisearch-http/src/index_controller/update_actor/store/dump.rs new file mode 100644 index 000000000..8f947e459 --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/store/dump.rs @@ -0,0 +1,189 @@ +use std::{ + collections::HashSet, + fs::{create_dir_all, File}, + io::{BufRead, BufReader, Write}, + path::{Path, PathBuf}, +}; + +use heed::{EnvOpenOptions, RoTxn}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use super::UpdateStore; +use super::{codec::UpdateKeyCodec, State}; +use crate::index_controller::{ + index_actor::IndexActorHandle, update_actor::store::update_uuid_to_file_path, Enqueued, + UpdateStatus, +}; + +#[derive(Serialize, Deserialize)] +struct UpdateEntry { + uuid: Uuid, + update: UpdateStatus, +} + +impl UpdateStore { + pub fn dump( + &self, + uuids: &HashSet, + path: PathBuf, + handle: impl IndexActorHandle, + ) -> anyhow::Result<()> { + let state_lock = self.state.write(); + state_lock.swap(State::Dumping); + + // txn must *always* be acquired after state lock, or it will dead lock. + let txn = self.env.write_txn()?; + + let dump_path = path.join("updates"); + create_dir_all(&dump_path)?; + + self.dump_updates(&txn, uuids, &dump_path)?; + + let fut = dump_indexes(uuids, handle, &path); + tokio::runtime::Handle::current().block_on(fut)?; + + state_lock.swap(State::Idle); + + Ok(()) + } + + fn dump_updates( + &self, + txn: &RoTxn, + uuids: &HashSet, + path: impl AsRef, + ) -> anyhow::Result<()> { + let dump_data_path = path.as_ref().join("data.jsonl"); + let mut dump_data_file = File::create(dump_data_path)?; + + let update_files_path = path.as_ref().join(super::UPDATE_DIR); + create_dir_all(&update_files_path)?; + + self.dump_pending(&txn, uuids, &mut dump_data_file, &path)?; + self.dump_completed(&txn, uuids, &mut dump_data_file)?; + + Ok(()) + } + + fn dump_pending( + &self, + txn: &RoTxn, + uuids: &HashSet, + mut file: &mut File, + dst_path: impl AsRef, + ) -> anyhow::Result<()> { + let pendings = self.pending_queue.iter(txn)?.lazily_decode_data(); + + for pending in pendings { + let ((_, uuid, _), data) = pending?; + if uuids.contains(&uuid) { + let update = data.decode()?; + + if let Some(ref update_uuid) = update.content { + let src = super::update_uuid_to_file_path(&self.path, *update_uuid); + let dst = super::update_uuid_to_file_path(&dst_path, *update_uuid); + std::fs::copy(src, dst)?; + } + + let update_json = UpdateEntry { + uuid, + update: update.into(), + }; + + serde_json::to_writer(&mut file, &update_json)?; + file.write_all(b"\n")?; + } + } + + Ok(()) + } + + fn dump_completed( + &self, + txn: &RoTxn, + uuids: &HashSet, + mut file: &mut File, + ) -> anyhow::Result<()> { + let updates = self + .updates + .iter(txn)? + .remap_key_type::() + .lazily_decode_data(); + + for update in updates { + let ((uuid, _), data) = update?; + if uuids.contains(&uuid) { + let update = data.decode()?; + + let update_json = UpdateEntry { uuid, update }; + + serde_json::to_writer(&mut file, &update_json)?; + file.write_all(b"\n")?; + } + } + + Ok(()) + } + + pub fn load_dump( + src: impl AsRef, + dst: impl AsRef, + db_size: usize, + ) -> anyhow::Result<()> { + let dst_update_path = dst.as_ref().join("updates/"); + create_dir_all(&dst_update_path)?; + + let mut options = EnvOpenOptions::new(); + options.map_size(db_size as usize); + let (store, _) = UpdateStore::new(options, &dst_update_path)?; + + let src_update_path = src.as_ref().join("updates"); + let update_data = File::open(&src_update_path.join("data.jsonl"))?; + let mut update_data = BufReader::new(update_data); + + std::fs::create_dir_all(dst_update_path.join("update_files/"))?; + + let mut wtxn = store.env.write_txn()?; + let mut line = String::new(); + loop { + match update_data.read_line(&mut line) { + Ok(0) => break, + Ok(_) => { + let UpdateEntry { uuid, update } = serde_json::from_str(&line)?; + store.register_raw_updates(&mut wtxn, &update, uuid)?; + + // Copy ascociated update path if it exists + if let UpdateStatus::Enqueued(Enqueued { + content: Some(uuid), + .. + }) = update + { + let src = update_uuid_to_file_path(&src_update_path, uuid); + let dst = update_uuid_to_file_path(&dst_update_path, uuid); + std::fs::copy(src, dst)?; + } + } + _ => break, + } + + line.clear(); + } + + wtxn.commit()?; + + Ok(()) + } +} + +async fn dump_indexes( + uuids: &HashSet, + handle: impl IndexActorHandle, + path: impl AsRef, +) -> anyhow::Result<()> { + for uuid in uuids { + handle.dump(*uuid, path.as_ref().to_owned()).await?; + } + + Ok(()) +} diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs similarity index 74% rename from meilisearch-http/src/index_controller/update_actor/update_store.rs rename to meilisearch-http/src/index_controller/update_actor/store/mod.rs index f91a2740c..28204f4c0 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -1,36 +1,35 @@ -use std::collections::{BTreeMap, HashSet}; -use std::convert::TryInto; +mod codec; +pub mod dump; + use std::fs::{copy, create_dir_all, remove_file, File}; -use std::mem::size_of; use std::path::Path; use std::sync::Arc; -use std::{borrow::Cow, path::PathBuf}; +use std::{ + collections::{BTreeMap, HashSet}, + path::PathBuf, +}; -use anyhow::Context; use arc_swap::ArcSwap; use futures::StreamExt; use heed::types::{ByteSlice, OwnedType, SerdeJson}; use heed::zerocopy::U64; -use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions}; +use heed::{CompactionOption, Database, Env, EnvOpenOptions}; use log::error; use parking_lot::{Mutex, MutexGuard}; use tokio::runtime::Handle; use tokio::sync::mpsc; use uuid::Uuid; +use codec::*; + use super::UpdateMeta; -use crate::{helpers::EnvSizer, index_controller::index_actor::IndexResult}; use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle}; +use crate::{helpers::EnvSizer, index_controller::index_actor::IndexResult}; #[allow(clippy::upper_case_acronyms)] type BEU64 = U64; -struct NextIdCodec; - -enum NextIdKey { - Global, - Index(Uuid), -} +const UPDATE_DIR: &str = "update_files"; pub struct UpdateStoreInfo { /// Size of the update store in bytes. @@ -45,13 +44,13 @@ pub struct StateLock { data: ArcSwap, } -struct StateLockGuard<'a> { +pub struct StateLockGuard<'a> { _lock: MutexGuard<'a, ()>, state: &'a StateLock, } impl StateLockGuard<'_> { - fn swap(&self, state: State) -> Arc { + pub fn swap(&self, state: State) -> Arc { self.state.data.swap(Arc::new(state)) } } @@ -63,11 +62,11 @@ impl StateLock { Self { lock, data } } - fn read(&self) -> Arc { + pub fn read(&self) -> Arc { self.data.load().clone() } - fn write(&self) -> StateLockGuard { + pub fn write(&self) -> StateLockGuard { let _lock = self.lock.lock(); let state = &self; StateLockGuard { _lock, state } @@ -82,81 +81,6 @@ pub enum State { Dumping, } -impl<'a> BytesEncode<'a> for NextIdCodec { - type EItem = NextIdKey; - - fn bytes_encode(item: &'a Self::EItem) -> Option> { - match item { - NextIdKey::Global => Some(Cow::Borrowed(b"__global__")), - NextIdKey::Index(ref uuid) => Some(Cow::Borrowed(uuid.as_bytes())), - } - } -} - -struct PendingKeyCodec; - -impl<'a> BytesEncode<'a> for PendingKeyCodec { - type EItem = (u64, Uuid, u64); - - fn bytes_encode((global_id, uuid, update_id): &'a Self::EItem) -> Option> { - let mut bytes = Vec::with_capacity(size_of::()); - bytes.extend_from_slice(&global_id.to_be_bytes()); - bytes.extend_from_slice(uuid.as_bytes()); - bytes.extend_from_slice(&update_id.to_be_bytes()); - Some(Cow::Owned(bytes)) - } -} - -impl<'a> BytesDecode<'a> for PendingKeyCodec { - type DItem = (u64, Uuid, u64); - - fn bytes_decode(bytes: &'a [u8]) -> Option { - let global_id_bytes = bytes.get(0..size_of::())?.try_into().ok()?; - let global_id = u64::from_be_bytes(global_id_bytes); - - let uuid_bytes = bytes - .get(size_of::()..(size_of::() + size_of::()))? - .try_into() - .ok()?; - let uuid = Uuid::from_bytes(uuid_bytes); - - let update_id_bytes = bytes - .get((size_of::() + size_of::())..)? - .try_into() - .ok()?; - let update_id = u64::from_be_bytes(update_id_bytes); - - Some((global_id, uuid, update_id)) - } -} - -struct UpdateKeyCodec; - -impl<'a> BytesEncode<'a> for UpdateKeyCodec { - type EItem = (Uuid, u64); - - fn bytes_encode((uuid, update_id): &'a Self::EItem) -> Option> { - let mut bytes = Vec::with_capacity(size_of::()); - bytes.extend_from_slice(uuid.as_bytes()); - bytes.extend_from_slice(&update_id.to_be_bytes()); - Some(Cow::Owned(bytes)) - } -} - -impl<'a> BytesDecode<'a> for UpdateKeyCodec { - type DItem = (Uuid, u64); - - fn bytes_decode(bytes: &'a [u8]) -> Option { - let uuid_bytes = bytes.get(0..size_of::())?.try_into().ok()?; - let uuid = Uuid::from_bytes(uuid_bytes); - - let update_id_bytes = bytes.get(size_of::()..)?.try_into().ok()?; - let update_id = u64::from_be_bytes(update_id_bytes); - - Some((uuid, update_id)) - } -} - #[derive(Clone)] pub struct UpdateStore { pub env: Env, @@ -174,19 +98,20 @@ pub struct UpdateStore { /// | 16-bytes | 8-bytes | updates: Database>, /// Indicates the current state of the update store, - state: Arc, + pub state: Arc, /// Wake up the loop when a new event occurs. notification_sender: mpsc::Sender<()>, + path: PathBuf, } impl UpdateStore { - pub fn create( + fn new( mut options: EnvOpenOptions, path: impl AsRef, ) -> anyhow::Result<(Self, mpsc::Receiver<()>)> { options.max_dbs(5); - let env = options.open(path)?; + let env = options.open(&path)?; let pending_queue = env.create_database(Some("pending-queue"))?; let next_update_id = env.create_database(Some("next-update-id"))?; let updates = env.create_database(Some("updates"))?; @@ -194,8 +119,6 @@ impl UpdateStore { let state = Arc::new(StateLock::from_state(State::Idle)); let (notification_sender, notification_receiver) = mpsc::channel(10); - // Send a first notification to trigger the process. - let _ = notification_sender.send(()); Ok(( Self { @@ -205,6 +128,7 @@ impl UpdateStore { updates, state, notification_sender, + path: path.as_ref().to_owned(), }, notification_receiver, )) @@ -215,9 +139,12 @@ impl UpdateStore { path: impl AsRef, index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static, ) -> anyhow::Result> { - let (update_store, mut notification_receiver) = Self::create(options, path)?; + let (update_store, mut notification_receiver) = Self::new(options, path)?; let update_store = Arc::new(update_store); + // Send a first notification to trigger the process. + let _ = update_store.notification_sender.send(()); + // Init update loop to perform any pending updates at launch. // Since we just launched the update store, and we still own the receiving end of the // channel, this call is guaranteed to succeed. @@ -296,13 +223,13 @@ impl UpdateStore { pub fn register_update( &self, meta: UpdateMeta, - content: Option>, + content: Option, index_uuid: Uuid, ) -> heed::Result { let mut txn = self.env.write_txn()?; let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?; - let meta = Enqueued::new(meta, update_id, content.map(|p| p.as_ref().to_owned())); + let meta = Enqueued::new(meta, update_id, content); self.pending_queue .put(&mut txn, &(global_id, index_uuid, update_id), &meta)?; @@ -320,7 +247,7 @@ impl UpdateStore { pub fn register_raw_updates( &self, wtxn: &mut heed::RwTxn, - update: UpdateStatus, + update: &UpdateStatus, index_uuid: Uuid, ) -> heed::Result<()> { match update { @@ -364,13 +291,14 @@ impl UpdateStore { let processing = pending.processing(); // Acquire the state lock and set the current state to processing. + // txn must *always* be acquired after state lock, or it will dead lock. let state = self.state.write(); state.swap(State::Processing(index_uuid, processing.clone())); let file = match content_path { - Some(ref path) => { - let file = File::open(path) - .with_context(|| format!("file at path: {:?}", &content_path))?; + Some(uuid) => { + let path = update_uuid_to_file_path(&self.path, uuid); + let file = File::open(path)?; Some(file) } None => None, @@ -386,7 +314,8 @@ impl UpdateStore { self.pending_queue .delete(&mut wtxn, &(global_id, index_uuid, update_id))?; - if let Some(path) = content_path { + if let Some(uuid) = content_path { + let path = update_uuid_to_file_path(&self.path, uuid); remove_file(&path)?; } @@ -486,7 +415,7 @@ impl UpdateStore { pub fn delete_all(&self, index_uuid: Uuid) -> anyhow::Result<()> { let mut txn = self.env.write_txn()?; // Contains all the content file paths that we need to be removed if the deletion was successful. - let mut paths_to_remove = Vec::new(); + let mut uuids_to_remove = Vec::new(); let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data(); @@ -494,8 +423,8 @@ impl UpdateStore { if uuid == index_uuid { pendings.del_current()?; let mut pending = pending.decode()?; - if let Some(path) = pending.content.take() { - paths_to_remove.push(path); + if let Some(update_uuid) = pending.content.take() { + uuids_to_remove.push(update_uuid); } } } @@ -515,9 +444,12 @@ impl UpdateStore { txn.commit()?; - paths_to_remove.iter().for_each(|path| { - let _ = remove_file(path); - }); + uuids_to_remove + .iter() + .map(|uuid| update_uuid_to_file_path(&self.path, *uuid)) + .for_each(|path| { + let _ = remove_file(path); + }); // We don't care about the currently processing update, since it will be removed by itself // once its done processing, and we can't abort a running update. @@ -546,7 +478,7 @@ impl UpdateStore { // create db snapshot self.env.copy_to_path(&db_path, CompactionOption::Enabled)?; - let update_files_path = update_path.join("update_files"); + let update_files_path = update_path.join(UPDATE_DIR); create_dir_all(&update_files_path)?; let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); @@ -554,10 +486,13 @@ impl UpdateStore { for entry in pendings { let ((_, uuid, _), pending) = entry?; if uuids.contains(&uuid) { - if let Some(path) = pending.decode()?.content_path() { - let name = path.file_name().unwrap(); - let to = update_files_path.join(name); - copy(path, to)?; + if let Enqueued { + content: Some(uuid), + .. + } = pending.decode()? + { + let path = update_uuid_to_file_path(&self.path, uuid); + copy(path, &update_files_path)?; } } } @@ -580,85 +515,17 @@ impl UpdateStore { Ok(()) } - pub fn dump( - &self, - uuids: &HashSet<(String, Uuid)>, - path: PathBuf, - handle: impl IndexActorHandle, - ) -> anyhow::Result<()> { - use std::io::prelude::*; - let state_lock = self.state.write(); - state_lock.swap(State::Dumping); - - let txn = self.env.write_txn()?; - - for (index_uid, index_uuid) in uuids.iter() { - let file = File::create(path.join(index_uid).join("updates.jsonl"))?; - let mut file = std::io::BufWriter::new(file); - - let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); - for entry in pendings { - let ((_, uuid, _), pending) = entry?; - if &uuid == index_uuid { - let mut update: UpdateStatus = pending.decode()?.into(); - if let Some(path) = update.content_path_mut() { - *path = path.file_name().expect("update path can't be empty").into(); - } - serde_json::to_writer(&mut file, &update)?; - file.write_all(b"\n")?; - } - } - - let updates = self.updates.prefix_iter(&txn, index_uuid.as_bytes())?; - for entry in updates { - let (_, update) = entry?; - let mut update = update.clone(); - if let Some(path) = update.content_path_mut() { - *path = path.file_name().expect("update path can't be empty").into(); - } - serde_json::to_writer(&mut file, &update)?; - file.write_all(b"\n")?; - } - } - - let update_files_path = path.join("update_files"); - create_dir_all(&update_files_path)?; - - let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); - - for entry in pendings { - let ((_, uuid, _), pending) = entry?; - if uuids.iter().any(|(_, id)| id == &uuid) { - if let Some(path) = pending.decode()?.content_path() { - let name = path.file_name().unwrap(); - let to = update_files_path.join(name); - copy(path, to)?; - } - } - } - - // Perform the dump of each index concurently. Only a third of the capabilities of - // the index actor at a time not to put too much pressure on the index actor - let path = &path; - - let mut stream = futures::stream::iter(uuids.iter()) - .map(|(uid, uuid)| handle.dump(uid.clone(), *uuid, path.clone())) - .buffer_unordered(CONCURRENT_INDEX_MSG / 3); - - Handle::current().block_on(async { - while let Some(res) = stream.next().await { - res?; - } - Ok(()) - }) - } - pub fn get_info(&self) -> anyhow::Result { let mut size = self.env.size(); let txn = self.env.read_txn()?; for entry in self.pending_queue.iter(&txn)? { let (_, pending) = entry?; - if let Some(path) = pending.content_path() { + if let Enqueued { + content: Some(uuid), + .. + } = pending + { + let path = update_uuid_to_file_path(&self.path, uuid); size += File::open(path)?.metadata()?.len(); } } @@ -671,6 +538,12 @@ impl UpdateStore { } } +fn update_uuid_to_file_path(root: impl AsRef, uuid: Uuid) -> PathBuf { + root.as_ref() + .join(UPDATE_DIR) + .join(format!("update_{}", uuid)) +} + #[cfg(test)] mod test { use super::*; @@ -716,9 +589,7 @@ mod test { let uuid = Uuid::new_v4(); let store_clone = update_store.clone(); tokio::task::spawn_blocking(move || { - store_clone - .register_update(meta, Some("here"), uuid) - .unwrap(); + store_clone.register_update(meta, None, uuid).unwrap(); }) .await .unwrap(); diff --git a/meilisearch-http/src/index_controller/updates.rs b/meilisearch-http/src/index_controller/updates.rs index 31f0005f8..0aacf9b6c 100644 --- a/meilisearch-http/src/index_controller/updates.rs +++ b/meilisearch-http/src/index_controller/updates.rs @@ -1,8 +1,7 @@ -use std::path::{Path, PathBuf}; - use chrono::{DateTime, Utc}; use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateFormat}; use serde::{Deserialize, Serialize}; +use uuid::Uuid; use crate::index::{Checked, Settings}; @@ -34,11 +33,11 @@ pub struct Enqueued { pub update_id: u64, pub meta: UpdateMeta, pub enqueued_at: DateTime, - pub content: Option, + pub content: Option, } impl Enqueued { - pub fn new(meta: UpdateMeta, update_id: u64, content: Option) -> Self { + pub fn new(meta: UpdateMeta, update_id: u64, content: Option) -> Self { Self { enqueued_at: Utc::now(), meta, @@ -68,14 +67,6 @@ impl Enqueued { pub fn id(&self) -> u64 { self.update_id } - - pub fn content_path(&self) -> Option<&Path> { - self.content.as_deref() - } - - pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> { - self.content.as_mut() - } } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -91,14 +82,6 @@ impl Processed { pub fn id(&self) -> u64 { self.from.id() } - - pub fn content_path(&self) -> Option<&Path> { - self.from.content_path() - } - - pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> { - self.from.content_path_mut() - } } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -118,14 +101,6 @@ impl Processing { self.from.meta() } - pub fn content_path(&self) -> Option<&Path> { - self.from.content_path() - } - - pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> { - self.from.content_path_mut() - } - pub fn process(self, success: UpdateResult) -> Processed { Processed { success, @@ -155,14 +130,6 @@ impl Aborted { pub fn id(&self) -> u64 { self.from.id() } - - pub fn content_path(&self) -> Option<&Path> { - self.from.content_path() - } - - pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> { - self.from.content_path_mut() - } } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -178,14 +145,6 @@ impl Failed { pub fn id(&self) -> u64 { self.from.id() } - - pub fn content_path(&self) -> Option<&Path> { - self.from.content_path() - } - - pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> { - self.from.content_path_mut() - } } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -215,26 +174,6 @@ impl UpdateStatus { _ => None, } } - - pub fn content_path(&self) -> Option<&Path> { - match self { - UpdateStatus::Processing(u) => u.content_path(), - UpdateStatus::Processed(u) => u.content_path(), - UpdateStatus::Aborted(u) => u.content_path(), - UpdateStatus::Failed(u) => u.content_path(), - UpdateStatus::Enqueued(u) => u.content_path(), - } - } - - pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> { - match self { - UpdateStatus::Processing(u) => u.content_path_mut(), - UpdateStatus::Processed(u) => u.content_path_mut(), - UpdateStatus::Aborted(u) => u.content_path_mut(), - UpdateStatus::Failed(u) => u.content_path_mut(), - UpdateStatus::Enqueued(u) => u.content_path_mut(), - } - } } impl From for UpdateStatus { diff --git a/meilisearch-http/src/index_controller/uuid_resolver/actor.rs b/meilisearch-http/src/index_controller/uuid_resolver/actor.rs index 253326276..0211cef25 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/actor.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/actor.rs @@ -4,7 +4,7 @@ use log::{info, warn}; use tokio::sync::mpsc; use uuid::Uuid; -use super::{Result, UuidError, UuidResolveMsg, UuidStore}; +use super::{Result, UuidResolveMsg, UuidResolverError, UuidStore}; pub struct UuidResolverActor { inbox: mpsc::Receiver, @@ -44,6 +44,9 @@ impl UuidResolverActor { Some(GetSize { ret }) => { let _ = ret.send(self.handle_get_size().await); } + Some(DumpRequest { path, ret }) => { + let _ = ret.send(self.handle_dump(path).await); + } // all senders have been dropped, need to quit. None => break, } @@ -54,7 +57,7 @@ impl UuidResolverActor { async fn handle_create(&self, uid: String) -> Result { if !is_index_uid_valid(&uid) { - return Err(UuidError::BadlyFormatted(uid)); + return Err(UuidResolverError::BadlyFormatted(uid)); } self.store.create_uuid(uid, true).await } @@ -63,14 +66,14 @@ impl UuidResolverActor { self.store .get_uuid(uid.clone()) .await? - .ok_or(UuidError::UnexistingIndex(uid)) + .ok_or(UuidResolverError::UnexistingIndex(uid)) } async fn handle_delete(&self, uid: String) -> Result { self.store .delete(uid.clone()) .await? - .ok_or(UuidError::UnexistingIndex(uid)) + .ok_or(UuidResolverError::UnexistingIndex(uid)) } async fn handle_list(&self) -> Result> { @@ -82,9 +85,13 @@ impl UuidResolverActor { self.store.snapshot(path).await } + async fn handle_dump(&self, path: PathBuf) -> Result> { + self.store.dump(path).await + } + async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> { if !is_index_uid_valid(&uid) { - return Err(UuidError::BadlyFormatted(uid)); + return Err(UuidResolverError::BadlyFormatted(uid)); } self.store.insert(uid, uuid).await?; Ok(()) diff --git a/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs index db4c482bd..981beb0f6 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs @@ -85,4 +85,12 @@ impl UuidResolverHandle for UuidResolverHandleImpl { .await .expect("Uuid resolver actor has been killed")?) } + async fn dump(&self, path: PathBuf) -> Result> { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::DumpRequest { ret, path }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } } diff --git a/meilisearch-http/src/index_controller/uuid_resolver/message.rs b/meilisearch-http/src/index_controller/uuid_resolver/message.rs index a72bf0587..2092c67fd 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/message.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/message.rs @@ -34,4 +34,8 @@ pub enum UuidResolveMsg { GetSize { ret: oneshot::Sender>, }, + DumpRequest { + path: PathBuf, + ret: oneshot::Sender>>, + }, } diff --git a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs index 0cbb2895b..5bddadf02 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs @@ -1,7 +1,7 @@ mod actor; mod handle_impl; mod message; -mod store; +pub mod store; use std::collections::HashSet; use std::path::PathBuf; @@ -16,12 +16,12 @@ use store::UuidStore; #[cfg(test)] use mockall::automock; -pub use store::HeedUuidStore; pub use handle_impl::UuidResolverHandleImpl; +pub use store::HeedUuidStore; const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB -pub type Result = std::result::Result; +pub type Result = std::result::Result; #[async_trait::async_trait] #[cfg_attr(test, automock)] @@ -33,20 +33,37 @@ pub trait UuidResolverHandle { async fn list(&self) -> anyhow::Result>; async fn snapshot(&self, path: PathBuf) -> Result>; async fn get_size(&self) -> Result; + async fn dump(&self, path: PathBuf) -> Result>; } #[derive(Debug, Error)] -pub enum UuidError { +pub enum UuidResolverError { #[error("Name already exist.")] NameAlreadyExist, #[error("Index \"{0}\" doesn't exist.")] UnexistingIndex(String), - #[error("Error performing task: {0}")] - TokioTask(#[from] tokio::task::JoinError), - #[error("Database error: {0}")] - Heed(#[from] heed::Error), - #[error("Uuid error: {0}")] - Uuid(#[from] uuid::Error), #[error("Badly formatted index uid: {0}")] BadlyFormatted(String), + #[error("Internal error resolving index uid: {0}")] + Internal(String), } + +macro_rules! internal_error { + ($($other:path), *) => { + $( + impl From<$other> for UuidResolverError { + fn from(other: $other) -> Self { + Self::Internal(other.to_string()) + } + } + )* + } +} + +internal_error!( + heed::Error, + uuid::Error, + std::io::Error, + tokio::task::JoinError, + serde_json::Error +); diff --git a/meilisearch-http/src/index_controller/uuid_resolver/store.rs b/meilisearch-http/src/index_controller/uuid_resolver/store.rs index a781edcba..1d6ada269 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/store.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/store.rs @@ -1,18 +1,26 @@ use std::collections::HashSet; -use std::fs::create_dir_all; +use std::fs::{create_dir_all, File}; +use std::io::{BufRead, BufReader, Write}; use std::path::{Path, PathBuf}; -use heed::{ - types::{ByteSlice, Str}, - CompactionOption, Database, Env, EnvOpenOptions, -}; +use heed::types::{ByteSlice, Str}; +use heed::{CompactionOption, Database, Env, EnvOpenOptions}; +use serde::{Deserialize, Serialize}; use uuid::Uuid; -use super::{Result, UuidError, UUID_STORE_SIZE}; +use super::{Result, UuidResolverError, UUID_STORE_SIZE}; use crate::helpers::EnvSizer; +#[derive(Serialize, Deserialize)] +struct DumpEntry { + uuid: Uuid, + uid: String, +} + +const UUIDS_DB_PATH: &str = "index_uuids"; + #[async_trait::async_trait] -pub trait UuidStore { +pub trait UuidStore: Sized { // Create a new entry for `name`. Return an error if `err` and the entry already exists, return // the uuid otherwise. async fn create_uuid(&self, uid: String, err: bool) -> Result; @@ -22,6 +30,7 @@ pub trait UuidStore { async fn insert(&self, name: String, uuid: Uuid) -> Result<()>; async fn snapshot(&self, path: PathBuf) -> Result>; async fn get_size(&self) -> Result; + async fn dump(&self, path: PathBuf) -> Result>; } #[derive(Clone)] @@ -32,7 +41,7 @@ pub struct HeedUuidStore { impl HeedUuidStore { pub fn new(path: impl AsRef) -> anyhow::Result { - let path = path.as_ref().join("index_uuids"); + let path = path.as_ref().join(UUIDS_DB_PATH); create_dir_all(&path)?; let mut options = EnvOpenOptions::new(); options.map_size(UUID_STORE_SIZE); // 1GB @@ -48,7 +57,7 @@ impl HeedUuidStore { match db.get(&txn, &name)? { Some(uuid) => { if err { - Err(UuidError::NameAlreadyExist) + Err(UuidResolverError::NameAlreadyExist) } else { let uuid = Uuid::from_slice(uuid)?; Ok(uuid) @@ -62,7 +71,6 @@ impl HeedUuidStore { } } } - pub fn get_uuid(&self, name: String) -> Result> { let env = self.env.clone(); let db = self.db; @@ -127,7 +135,7 @@ impl HeedUuidStore { // only perform snapshot if there are indexes if !entries.is_empty() { - path.push("index_uuids"); + path.push(UUIDS_DB_PATH); create_dir_all(&path).unwrap(); path.push("data.mdb"); env.copy_to_path(path, CompactionOption::Enabled)?; @@ -138,6 +146,61 @@ impl HeedUuidStore { pub fn get_size(&self) -> Result { Ok(self.env.size()) } + + pub fn dump(&self, path: PathBuf) -> Result> { + let dump_path = path.join(UUIDS_DB_PATH); + create_dir_all(&dump_path)?; + let dump_file_path = dump_path.join("data.jsonl"); + let mut dump_file = File::create(&dump_file_path)?; + let mut uuids = HashSet::new(); + + let txn = self.env.read_txn()?; + for entry in self.db.iter(&txn)? { + let (uid, uuid) = entry?; + let uid = uid.to_string(); + let uuid = Uuid::from_slice(uuid)?; + + let entry = DumpEntry { uuid, uid }; + serde_json::to_writer(&mut dump_file, &entry)?; + dump_file.write_all(b"\n").unwrap(); + + uuids.insert(uuid); + } + + Ok(uuids) + } + + pub fn load_dump(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { + let uuid_resolver_path = dst.as_ref().join(UUIDS_DB_PATH); + std::fs::create_dir_all(&uuid_resolver_path)?; + + let src_indexes = src.as_ref().join(UUIDS_DB_PATH).join("data.jsonl"); + let indexes = File::open(&src_indexes)?; + let mut indexes = BufReader::new(indexes); + let mut line = String::new(); + + let db = Self::new(dst)?; + let mut txn = db.env.write_txn()?; + + loop { + match indexes.read_line(&mut line) { + Ok(0) => break, + Ok(_) => { + let DumpEntry { uuid, uid } = serde_json::from_str(&line)?; + println!("importing {} {}", uid, uuid); + db.db.put(&mut txn, &uid, uuid.as_bytes())?; + } + Err(e) => return Err(e.into()), + } + + line.clear(); + } + txn.commit()?; + + db.env.prepare_for_closing().wait(); + + Ok(()) + } } #[async_trait::async_trait] @@ -175,4 +238,9 @@ impl UuidStore for HeedUuidStore { async fn get_size(&self) -> Result { self.get_size() } + + async fn dump(&self, path: PathBuf) -> Result> { + let this = self.clone(); + tokio::task::spawn_blocking(move || this.dump(path)).await? + } } diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index e19037482..26b6a784c 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -62,11 +62,11 @@ macro_rules! create_app { app.wrap( Cors::default() - .send_wildcard() - .allowed_headers(vec!["content-type", "x-meili-api-key"]) - .allow_any_origin() - .allow_any_method() - .max_age(86_400) // 24h + .send_wildcard() + .allowed_headers(vec!["content-type", "x-meili-api-key"]) + .allow_any_origin() + .allow_any_method() + .max_age(86_400), // 24h ) .wrap(middleware::Logger::default()) .wrap(middleware::Compress::default()) diff --git a/meilisearch-http/src/routes/dump.rs b/meilisearch-http/src/routes/dump.rs index 47c081e6f..370eef509 100644 --- a/meilisearch-http/src/routes/dump.rs +++ b/meilisearch-http/src/routes/dump.rs @@ -1,20 +1,17 @@ -use actix_web::{post, get, web}; use actix_web::HttpResponse; -use serde::{Serialize, Deserialize}; +use actix_web::{get, post, web}; +use serde::{Deserialize, Serialize}; use crate::error::ResponseError; use crate::helpers::Authentication; use crate::Data; pub fn services(cfg: &mut web::ServiceConfig) { - cfg.service(create_dump) - .service(get_dump_status); + cfg.service(create_dump).service(get_dump_status); } #[post("/dumps", wrap = "Authentication::Private")] -async fn create_dump( - data: web::Data, -) -> Result { +async fn create_dump(data: web::Data) -> Result { let res = data.create_dump().await?; Ok(HttpResponse::Accepted().json(res)) diff --git a/meilisearch-http/src/routes/index.rs b/meilisearch-http/src/routes/index.rs index 62717c90d..4dfe90abf 100644 --- a/meilisearch-http/src/routes/index.rs +++ b/meilisearch-http/src/routes/index.rs @@ -1,7 +1,7 @@ use actix_web::{delete, get, post, put}; use actix_web::{web, HttpResponse}; use chrono::{DateTime, Utc}; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; use crate::error::ResponseError; use crate::helpers::Authentication; diff --git a/meilisearch-http/src/routes/mod.rs b/meilisearch-http/src/routes/mod.rs index 999c4f881..a550064ba 100644 --- a/meilisearch-http/src/routes/mod.rs +++ b/meilisearch-http/src/routes/mod.rs @@ -2,6 +2,7 @@ use actix_web::{get, HttpResponse}; use serde::{Deserialize, Serialize}; pub mod document; +pub mod dump; pub mod health; pub mod index; pub mod key; @@ -9,7 +10,6 @@ pub mod search; pub mod settings; pub mod stats; pub mod synonym; -pub mod dump; #[derive(Deserialize)] pub struct IndexParam { diff --git a/meilisearch-http/src/routes/settings/mod.rs b/meilisearch-http/src/routes/settings/mod.rs index 03f1ee95c..8ede56046 100644 --- a/meilisearch-http/src/routes/settings/mod.rs +++ b/meilisearch-http/src/routes/settings/mod.rs @@ -1,9 +1,9 @@ use actix_web::{delete, get, post, web, HttpResponse}; -use crate::{error::ResponseError, index::Unchecked}; use crate::helpers::Authentication; use crate::index::Settings; use crate::Data; +use crate::{error::ResponseError, index::Unchecked}; #[macro_export] macro_rules! make_setting_route { diff --git a/meilisearch-http/tests/common/index.rs b/meilisearch-http/tests/common/index.rs index adb7fef3e..7d98d0733 100644 --- a/meilisearch-http/tests/common/index.rs +++ b/meilisearch-http/tests/common/index.rs @@ -47,7 +47,7 @@ impl Index<'_> { update_id as u64 } - pub async fn create(& self, primary_key: Option<&str>) -> (Value, StatusCode) { + pub async fn create(&self, primary_key: Option<&str>) -> (Value, StatusCode) { let body = json!({ "uid": self.uid, "primaryKey": primary_key, diff --git a/meilisearch-http/tests/common/server.rs b/meilisearch-http/tests/common/server.rs index 100722ec4..3c50110c3 100644 --- a/meilisearch-http/tests/common/server.rs +++ b/meilisearch-http/tests/common/server.rs @@ -44,7 +44,7 @@ impl Server { } /// Returns a view to an index. There is no guarantee that the index exists. - pub fn index(& self, uid: impl AsRef) -> Index<'_> { + pub fn index(&self, uid: impl AsRef) -> Index<'_> { Index { uid: encode(uid.as_ref()), service: &self.service,