diff --git a/meilisearch-http/src/index_controller/actor_index_controller/mod.rs b/meilisearch-http/src/index_controller/actor_index_controller/mod.rs deleted file mode 100644 index 188d85580..000000000 --- a/meilisearch-http/src/index_controller/actor_index_controller/mod.rs +++ /dev/null @@ -1,127 +0,0 @@ -mod index_actor; -mod update_actor; -mod uuid_resolver; -mod update_store; -mod update_handler; - -use std::path::Path; - -use tokio::sync::{mpsc, oneshot}; -use uuid::Uuid; -use super::IndexMetadata; -use futures::stream::StreamExt; -use actix_web::web::Payload; -use super::UpdateMeta; -use crate::index::{SearchResult, SearchQuery}; -use actix_web::web::Bytes; - -use crate::index::Settings; -use super::UpdateStatus; - -pub struct IndexController { - uuid_resolver: uuid_resolver::UuidResolverHandle, - index_handle: index_actor::IndexActorHandle, - update_handle: update_actor::UpdateActorHandle, -} - -enum IndexControllerMsg { - CreateIndex { - uuid: Uuid, - primary_key: Option, - ret: oneshot::Sender>, - }, - Shutdown, -} - -impl IndexController { - pub fn new(path: impl AsRef) -> Self { - let uuid_resolver = uuid_resolver::UuidResolverHandle::new(); - let index_actor = index_actor::IndexActorHandle::new(&path); - let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path); - Self { uuid_resolver, index_handle: index_actor, update_handle } - } - - pub async fn add_documents( - &self, - index: String, - method: milli::update::IndexDocumentsMethod, - format: milli::update::UpdateFormat, - mut payload: Payload, - primary_key: Option, - ) -> anyhow::Result { - let uuid = self.uuid_resolver.get_or_create(index).await?; - let meta = UpdateMeta::DocumentsAddition { method, format, primary_key }; - let (sender, receiver) = mpsc::channel(10); - - // It is necessary to spawn a local task to senf the payload to the update handle to - // prevent dead_locking between the update_handle::update that waits for the update to be - // registered and the update_actor that waits for the the payload to be sent to it. - tokio::task::spawn_local(async move { - while let Some(bytes) = payload.next().await { - match bytes { - Ok(bytes) => { sender.send(Ok(bytes)).await; }, - Err(e) => { - let error: Box = Box::new(e); - sender.send(Err(error)).await; }, - } - } - }); - - // This must be done *AFTER* spawning the task. - let status = self.update_handle.update(meta, receiver, uuid).await?; - Ok(status) - } - - fn clear_documents(&self, index: String) -> anyhow::Result { - todo!() - } - - fn delete_documents(&self, index: String, document_ids: Vec) -> anyhow::Result { - todo!() - } - - fn update_settings(&self, index_uid: String, settings: Settings) -> anyhow::Result { - todo!() - } - - pub async fn create_index(&self, index_settings: super::IndexSettings) -> anyhow::Result { - let super::IndexSettings { name, primary_key } = index_settings; - let uuid = self.uuid_resolver.create(name.unwrap()).await?; - let index_meta = self.index_handle.create_index(uuid, primary_key).await?; - Ok(index_meta) - } - - fn delete_index(&self, index_uid: String) -> anyhow::Result<()> { - todo!() - } - - fn swap_indices(&self, index1_uid: String, index2_uid: String) -> anyhow::Result<()> { - todo!() - } - - pub fn index(&self, name: String) -> anyhow::Result>> { - todo!() - } - - fn update_status(&self, index: String, id: u64) -> anyhow::Result> { - todo!() - } - - fn all_update_status(&self, index: String) -> anyhow::Result> { - todo!() - } - - pub fn list_indexes(&self) -> anyhow::Result> { - todo!() - } - - fn update_index(&self, name: String, index_settings: super::IndexSettings) -> anyhow::Result { - todo!() - } - - pub async fn search(&self, name: String, query: SearchQuery) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(name).await.unwrap().unwrap(); - let result = self.index_handle.search(uuid, query).await?; - Ok(result) - } -} diff --git a/meilisearch-http/src/index_controller/index_actor.rs b/meilisearch-http/src/index_controller/index_actor.rs index 63be1eec5..52a69b964 100644 --- a/meilisearch-http/src/index_controller/index_actor.rs +++ b/meilisearch-http/src/index_controller/index_actor.rs @@ -357,7 +357,6 @@ impl IndexActor { async fn handle_get_meta(&self, uuid: Uuid) -> Result> { match self.store.get(uuid).await? { Some(index) => { - println!("geting meta yoyo"); let meta = spawn_blocking(move || IndexMeta::new(&index)) .await .map_err(|e| IndexError::Error(e.into()))??; diff --git a/meilisearch-http/src/index_controller/local_index_controller/index_store.rs b/meilisearch-http/src/index_controller/local_index_controller/index_store.rs deleted file mode 100644 index a690abaf4..000000000 --- a/meilisearch-http/src/index_controller/local_index_controller/index_store.rs +++ /dev/null @@ -1,606 +0,0 @@ -use std::fs::{create_dir_all, remove_dir_all}; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use std::time::Duration; - -use anyhow::{bail, Context}; -use chrono::{DateTime, Utc}; -use dashmap::{mapref::entry::Entry, DashMap}; -use heed::{ - types::{ByteSlice, SerdeJson, Str}, - Database, Env, EnvOpenOptions, RoTxn, RwTxn, -}; -use log::{error, info}; -use milli::Index; -use rayon::ThreadPool; use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -use super::update_handler::UpdateHandler; -use super::{UpdateMeta, UpdateResult}; -use crate::option::IndexerOpts; - -type UpdateStore = super::update_store::UpdateStore; - -#[derive(Serialize, Deserialize, Debug, PartialEq)] -pub struct IndexMeta { - update_store_size: u64, - index_store_size: u64, - pub uuid: Uuid, - pub created_at: DateTime, - pub updated_at: DateTime, -} - -impl IndexMeta { - fn open( - &self, - path: impl AsRef, - thread_pool: Arc, - indexer_options: &IndexerOpts, - ) -> anyhow::Result<(Arc, Arc)> { - let update_path = make_update_db_path(&path, &self.uuid); - let index_path = make_index_db_path(&path, &self.uuid); - - create_dir_all(&update_path)?; - create_dir_all(&index_path)?; - - let mut options = EnvOpenOptions::new(); - options.map_size(self.index_store_size as usize); - let index = Arc::new(Index::new(options, index_path)?); - - let mut options = EnvOpenOptions::new(); - options.map_size(self.update_store_size as usize); - let handler = UpdateHandler::new(indexer_options, index.clone(), thread_pool)?; - let update_store = UpdateStore::open(options, update_path, handler)?; - - Ok((index, update_store)) - } -} - -pub struct IndexStore { - env: Env, - name_to_uuid: Database, - uuid_to_index: DashMap, Arc)>, - uuid_to_index_meta: Database>, - - thread_pool: Arc, - indexer_options: IndexerOpts, -} - -impl IndexStore { - pub fn new(path: impl AsRef, indexer_options: IndexerOpts) -> anyhow::Result { - let env = EnvOpenOptions::new() - .map_size(4096 * 100) - .max_dbs(2) - .open(path)?; - - let uuid_to_index = DashMap::new(); - let name_to_uuid = open_or_create_database(&env, Some("name_to_uid"))?; - let uuid_to_index_meta = open_or_create_database(&env, Some("uid_to_index_db"))?; - - let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(indexer_options.indexing_jobs.unwrap_or(0)) - .build()?; - let thread_pool = Arc::new(thread_pool); - - Ok(Self { - env, - name_to_uuid, - uuid_to_index, - uuid_to_index_meta, - - thread_pool, - indexer_options, - }) - } - - pub fn delete(&self, index_uid: impl AsRef) -> anyhow::Result<()> { - // we remove the references to the index from the index map so it is not accessible anymore - let mut txn = self.env.write_txn()?; - let uuid = self - .index_uuid(&txn, &index_uid)? - .with_context(|| format!("Index {:?} doesn't exist", index_uid.as_ref()))?; - self.name_to_uuid.delete(&mut txn, index_uid.as_ref())?; - self.uuid_to_index_meta.delete(&mut txn, uuid.as_bytes())?; - txn.commit()?; - // If the index was loaded (i.e it is present in the uuid_to_index map), then we need to - // close it. The process goes as follow: - // - // 1) We want to remove any pending updates from the store. - // 2) We try to get ownership on the update store so we can close it. It may take a - // couple of tries, but since the update store event loop only has a weak reference to - // itself, and we are the only other function holding a reference to it otherwise, we will - // get it eventually. - // 3) We request a closing of the update store. - // 4) We can take ownership on the index, and close it. - // 5) We remove all the files from the file system. - let index_uid = index_uid.as_ref().to_string(); - let path = self.env.path().to_owned(); - if let Some((_, (index, updates))) = self.uuid_to_index.remove(&uuid) { - std::thread::spawn(move || { - info!("Preparing for {:?} deletion.", index_uid); - // this error is non fatal, but may delay the deletion. - if let Err(e) = updates.abort_pendings() { - error!( - "error aborting pending updates when deleting index {:?}: {}", - index_uid, e - ); - } - let updates = get_arc_ownership_blocking(updates); - let close_event = updates.prepare_for_closing(); - close_event.wait(); - info!("closed update store for {:?}", index_uid); - - let index = get_arc_ownership_blocking(index); - let close_event = index.prepare_for_closing(); - close_event.wait(); - - let update_path = make_update_db_path(&path, &uuid); - let index_path = make_index_db_path(&path, &uuid); - - if let Err(e) = remove_dir_all(index_path) { - error!("error removing index {:?}: {}", index_uid, e); - } - - if let Err(e) = remove_dir_all(update_path) { - error!("error removing index {:?}: {}", index_uid, e); - } - - info!("index {:?} deleted.", index_uid); - }); - } - - Ok(()) - } - - fn index_uuid(&self, txn: &RoTxn, name: impl AsRef) -> anyhow::Result> { - match self.name_to_uuid.get(txn, name.as_ref())? { - Some(bytes) => { - let uuid = Uuid::from_slice(bytes)?; - Ok(Some(uuid)) - } - None => Ok(None), - } - } - - fn retrieve_index( - &self, - txn: &RoTxn, - uid: Uuid, - ) -> anyhow::Result, Arc)>> { - match self.uuid_to_index.entry(uid.clone()) { - Entry::Vacant(entry) => match self.uuid_to_index_meta.get(txn, uid.as_bytes())? { - Some(meta) => { - let path = self.env.path(); - let (index, updates) = - meta.open(path, self.thread_pool.clone(), &self.indexer_options)?; - entry.insert((index.clone(), updates.clone())); - Ok(Some((index, updates))) - } - None => Ok(None), - }, - Entry::Occupied(entry) => { - let (index, updates) = entry.get(); - Ok(Some((index.clone(), updates.clone()))) - } - } - } - - fn get_index_txn( - &self, - txn: &RoTxn, - name: impl AsRef, - ) -> anyhow::Result, Arc)>> { - match self.index_uuid(&txn, name)? { - Some(uid) => self.retrieve_index(&txn, uid), - None => Ok(None), - } - } - - pub fn index( - &self, - name: impl AsRef, - ) -> anyhow::Result, Arc)>> { - let txn = self.env.read_txn()?; - self.get_index_txn(&txn, name) - } - - /// Use this function to perform an update on an index. - /// This function also puts a lock on what index is allowed to perform an update. - pub fn update_index(&self, name: impl AsRef, f: F) -> anyhow::Result<(T, IndexMeta)> - where - F: FnOnce(&Index) -> anyhow::Result, - { - let mut txn = self.env.write_txn()?; - let (index, _) = self - .get_index_txn(&txn, &name)? - .with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?; - let result = f(index.as_ref()); - match result { - Ok(ret) => { - let meta = self.update_meta(&mut txn, name, |meta| meta.updated_at = Utc::now())?; - txn.commit()?; - Ok((ret, meta)) - } - Err(e) => Err(e), - } - } - - pub fn index_with_meta( - &self, - name: impl AsRef, - ) -> anyhow::Result, IndexMeta)>> { - let txn = self.env.read_txn()?; - let uuid = self.index_uuid(&txn, &name)?; - match uuid { - Some(uuid) => { - let meta = self - .uuid_to_index_meta - .get(&txn, uuid.as_bytes())? - .with_context(|| { - format!("unable to retrieve metadata for index {:?}", name.as_ref()) - })?; - let (index, _) = self - .retrieve_index(&txn, uuid)? - .with_context(|| format!("unable to retrieve index {:?}", name.as_ref()))?; - Ok(Some((index, meta))) - } - None => Ok(None), - } - } - - fn update_meta( - &self, - txn: &mut RwTxn, - name: impl AsRef, - f: F, - ) -> anyhow::Result - where - F: FnOnce(&mut IndexMeta), - { - let uuid = self - .index_uuid(txn, &name)? - .with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?; - let mut meta = self - .uuid_to_index_meta - .get(txn, uuid.as_bytes())? - .with_context(|| format!("couldn't retrieve metadata for index {:?}", name.as_ref()))?; - f(&mut meta); - self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?; - Ok(meta) - } - - pub fn get_or_create_index( - &self, - name: impl AsRef, - update_size: u64, - index_size: u64, - ) -> anyhow::Result<(Arc, Arc)> { - let mut txn = self.env.write_txn()?; - match self.get_index_txn(&txn, name.as_ref())? { - Some(res) => Ok(res), - None => { - let uuid = Uuid::new_v4(); - let (index, updates, _) = - self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?; - // If we fail to commit the transaction, we must delete the database from the - // file-system. - if let Err(e) = txn.commit() { - self.clean_db(uuid); - return Err(e)?; - } - Ok((index, updates)) - } - } - } - - // Remove all the files and data associated with a db uuid. - fn clean_db(&self, uuid: Uuid) { - let update_db_path = make_update_db_path(self.env.path(), &uuid); - let index_db_path = make_index_db_path(self.env.path(), &uuid); - - remove_dir_all(update_db_path).expect("Failed to clean database"); - remove_dir_all(index_db_path).expect("Failed to clean database"); - - self.uuid_to_index.remove(&uuid); - } - - fn create_index_txn( - &self, - txn: &mut RwTxn, - uuid: Uuid, - name: impl AsRef, - update_store_size: u64, - index_store_size: u64, - ) -> anyhow::Result<(Arc, Arc, IndexMeta)> { - let created_at = Utc::now(); - let updated_at = created_at; - let meta = IndexMeta { - update_store_size, - index_store_size, - uuid: uuid.clone(), - created_at, - updated_at, - }; - - self.name_to_uuid.put(txn, name.as_ref(), uuid.as_bytes())?; - self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?; - - let path = self.env.path(); - let (index, update_store) = - match meta.open(path, self.thread_pool.clone(), &self.indexer_options) { - Ok(res) => res, - Err(e) => { - self.clean_db(uuid); - return Err(e); - } - }; - - self.uuid_to_index - .insert(uuid, (index.clone(), update_store.clone())); - - Ok((index, update_store, meta)) - } - - /// Same as `get_or_create`, but returns an error if the index already exists. - pub fn create_index( - &self, - name: impl AsRef, - update_size: u64, - index_size: u64, - ) -> anyhow::Result<(Arc, Arc, IndexMeta)> { - let uuid = Uuid::new_v4(); - let mut txn = self.env.write_txn()?; - - if self.name_to_uuid.get(&txn, name.as_ref())?.is_some() { - bail!("index {:?} already exists", name.as_ref()) - } - - let result = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?; - // If we fail to commit the transaction, we must delete the database from the - // file-system. - if let Err(e) = txn.commit() { - self.clean_db(uuid); - return Err(e)?; - } - Ok(result) - } - - /// Returns each index associated with its metadata: - /// (index_name, IndexMeta, primary_key) - /// This method will force all the indexes to be loaded. - pub fn list_indexes(&self) -> anyhow::Result)>> { - let txn = self.env.read_txn()?; - let metas = self.name_to_uuid.iter(&txn)?.filter_map(|entry| { - entry - .map_err(|e| { - error!("error decoding entry while listing indexes: {}", e); - e - }) - .ok() - }); - let mut indexes = Vec::new(); - for (name, uuid) in metas { - // get index to retrieve primary key - let (index, _) = self - .get_index_txn(&txn, name)? - .with_context(|| format!("could not load index {:?}", name))?; - let primary_key = index.primary_key(&index.read_txn()?)?.map(String::from); - // retieve meta - let meta = self - .uuid_to_index_meta - .get(&txn, &uuid)? - .with_context(|| format!("could not retieve meta for index {:?}", name))?; - indexes.push((name.to_owned(), meta, primary_key)); - } - Ok(indexes) - } -} - -// Loops on an arc to get ownership on the wrapped value. This method sleeps 100ms before retrying. -fn get_arc_ownership_blocking(mut item: Arc) -> T { - loop { - match Arc::try_unwrap(item) { - Ok(item) => return item, - Err(item_arc) => { - item = item_arc; - std::thread::sleep(Duration::from_millis(100)); - continue; - } - } - } -} - -fn open_or_create_database( - env: &Env, - name: Option<&str>, -) -> anyhow::Result> { - match env.open_database::(name)? { - Some(db) => Ok(db), - None => Ok(env.create_database::(name)?), - } -} - -fn make_update_db_path(path: impl AsRef, uuid: &Uuid) -> PathBuf { - let mut path = path.as_ref().to_path_buf(); - path.push(format!("update{}", uuid)); - path -} - -fn make_index_db_path(path: impl AsRef, uuid: &Uuid) -> PathBuf { - let mut path = path.as_ref().to_path_buf(); - path.push(format!("index{}", uuid)); - path -} - -#[cfg(test)] -mod test { - use super::*; - use std::path::PathBuf; - - #[test] - fn test_make_update_db_path() { - let uuid = Uuid::new_v4(); - assert_eq!( - make_update_db_path("/home", &uuid), - PathBuf::from(format!("/home/update{}", uuid)) - ); - } - - #[test] - fn test_make_index_db_path() { - let uuid = Uuid::new_v4(); - assert_eq!( - make_index_db_path("/home", &uuid), - PathBuf::from(format!("/home/index{}", uuid)) - ); - } - - mod index_store { - use super::*; - - #[test] - fn test_index_uuid() { - let temp = tempfile::tempdir().unwrap(); - let store = IndexStore::new(temp, IndexerOpts::default()).unwrap(); - - let name = "foobar"; - let txn = store.env.read_txn().unwrap(); - // name is not found if the uuid in not present in the db - assert!(store.index_uuid(&txn, &name).unwrap().is_none()); - drop(txn); - - // insert an uuid in the the name_to_uuid_db: - let uuid = Uuid::new_v4(); - let mut txn = store.env.write_txn().unwrap(); - store - .name_to_uuid - .put(&mut txn, &name, uuid.as_bytes()) - .unwrap(); - txn.commit().unwrap(); - - // check that the uuid is there - let txn = store.env.read_txn().unwrap(); - assert_eq!(store.index_uuid(&txn, &name).unwrap(), Some(uuid)); - } - - #[test] - fn test_retrieve_index() { - let temp = tempfile::tempdir().unwrap(); - let store = IndexStore::new(temp, IndexerOpts::default()).unwrap(); - let uuid = Uuid::new_v4(); - - let txn = store.env.read_txn().unwrap(); - assert!(store.retrieve_index(&txn, uuid).unwrap().is_none()); - - let created_at = Utc::now(); - let updated_at = created_at; - - let meta = IndexMeta { - update_store_size: 4096 * 100, - index_store_size: 4096 * 100, - uuid: uuid.clone(), - created_at, - updated_at, - }; - let mut txn = store.env.write_txn().unwrap(); - store - .uuid_to_index_meta - .put(&mut txn, uuid.as_bytes(), &meta) - .unwrap(); - txn.commit().unwrap(); - - // the index cache should be empty - assert!(store.uuid_to_index.is_empty()); - - let txn = store.env.read_txn().unwrap(); - assert!(store.retrieve_index(&txn, uuid).unwrap().is_some()); - assert_eq!(store.uuid_to_index.len(), 1); - } - - #[test] - fn test_index() { - let temp = tempfile::tempdir().unwrap(); - let store = IndexStore::new(temp, IndexerOpts::default()).unwrap(); - let name = "foobar"; - - assert!(store.index(&name).unwrap().is_none()); - - let created_at = Utc::now(); - let updated_at = created_at; - - let uuid = Uuid::new_v4(); - let meta = IndexMeta { - update_store_size: 4096 * 100, - index_store_size: 4096 * 100, - uuid: uuid.clone(), - created_at, - updated_at, - }; - let mut txn = store.env.write_txn().unwrap(); - store - .name_to_uuid - .put(&mut txn, &name, uuid.as_bytes()) - .unwrap(); - store - .uuid_to_index_meta - .put(&mut txn, uuid.as_bytes(), &meta) - .unwrap(); - txn.commit().unwrap(); - - assert!(store.index(&name).unwrap().is_some()); - } - - #[test] - fn test_get_or_create_index() { - let temp = tempfile::tempdir().unwrap(); - let store = IndexStore::new(temp, IndexerOpts::default()).unwrap(); - let name = "foobar"; - - let update_store_size = 4096 * 100; - let index_store_size = 4096 * 100; - store - .get_or_create_index(&name, update_store_size, index_store_size) - .unwrap(); - let txn = store.env.read_txn().unwrap(); - let uuid = store.name_to_uuid.get(&txn, &name).unwrap(); - assert_eq!(store.uuid_to_index.len(), 1); - assert!(uuid.is_some()); - let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); - let meta = store - .uuid_to_index_meta - .get(&txn, uuid.as_bytes()) - .unwrap() - .unwrap(); - assert_eq!(meta.update_store_size, update_store_size); - assert_eq!(meta.index_store_size, index_store_size); - assert_eq!(meta.uuid, uuid); - } - - #[test] - fn test_create_index() { - let temp = tempfile::tempdir().unwrap(); - let store = IndexStore::new(temp, IndexerOpts::default()).unwrap(); - let name = "foobar"; - - let update_store_size = 4096 * 100; - let index_store_size = 4096 * 100; - let uuid = Uuid::new_v4(); - let mut txn = store.env.write_txn().unwrap(); - store - .create_index_txn(&mut txn, uuid, name, update_store_size, index_store_size) - .unwrap(); - let uuid = store.name_to_uuid.get(&txn, &name).unwrap(); - assert_eq!(store.uuid_to_index.len(), 1); - assert!(uuid.is_some()); - let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); - let meta = store - .uuid_to_index_meta - .get(&txn, uuid.as_bytes()) - .unwrap() - .unwrap(); - assert_eq!(meta.update_store_size, update_store_size); - assert_eq!(meta.index_store_size, index_store_size); - assert_eq!(meta.uuid, uuid); - } - } -} diff --git a/meilisearch-http/src/index_controller/local_index_controller/mod.rs b/meilisearch-http/src/index_controller/local_index_controller/mod.rs deleted file mode 100644 index 8ac600f5f..000000000 --- a/meilisearch-http/src/index_controller/local_index_controller/mod.rs +++ /dev/null @@ -1,228 +0,0 @@ -mod update_store; -mod index_store; -mod update_handler; - -use std::path::Path; -use std::sync::Arc; - -use anyhow::{bail, Context}; -use itertools::Itertools; -use milli::Index; - -use crate::option::IndexerOpts; -use index_store::IndexStore; -use super::IndexController; -use super::updates::UpdateStatus; -use super::{UpdateMeta, UpdateResult, IndexMetadata, IndexSettings}; - -pub struct LocalIndexController { - indexes: IndexStore, - update_db_size: u64, - index_db_size: u64, -} - -impl LocalIndexController { - pub fn new( - path: impl AsRef, - opt: IndexerOpts, - index_db_size: u64, - update_db_size: u64, - ) -> anyhow::Result { - let indexes = IndexStore::new(path, opt)?; - Ok(Self { indexes, index_db_size, update_db_size }) - } -} - -impl IndexController for LocalIndexController { - fn add_documents>( - &self, - index: S, - method: milli::update::IndexDocumentsMethod, - format: milli::update::UpdateFormat, - data: &[u8], - primary_key: Option, - ) -> anyhow::Result> { - let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?; - let meta = UpdateMeta::DocumentsAddition { method, format, primary_key }; - let pending = update_store.register_update(meta, data)?; - Ok(pending.into()) - } - - fn update_settings>( - &self, - index: S, - settings: super::Settings - ) -> anyhow::Result> { - let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?; - let meta = UpdateMeta::Settings(settings); - let pending = update_store.register_update(meta, &[])?; - Ok(pending.into()) - } - - fn create_index(&self, index_settings: IndexSettings) -> anyhow::Result { - let index_name = index_settings.name.context("Missing name for index")?; - let (index, _, meta) = self.indexes.create_index(&index_name, self.update_db_size, self.index_db_size)?; - if let Some(ref primary_key) = index_settings.primary_key { - if let Err(e) = update_primary_key(index, primary_key).context("error creating index") { - // TODO: creating index could not be completed, delete everything. - Err(e)? - } - } - - let meta = IndexMetadata { - uid: index_name, - uuid: meta.uuid.clone(), - created_at: meta.created_at, - updated_at: meta.created_at, - primary_key: index_settings.primary_key, - }; - - Ok(meta) - } - - fn delete_index>(&self, index_uid: S) -> anyhow::Result<()> { - self.indexes.delete(index_uid) - } - - fn swap_indices, S2: AsRef>(&self, _index1_uid: S1, _index2_uid: S2) -> anyhow::Result<()> { - todo!() - } - - fn index(&self, name: impl AsRef) -> anyhow::Result>> { - let index = self.indexes.index(name)?.map(|(i, _)| i); - Ok(index) - } - - fn update_status(&self, index: impl AsRef, id: u64) -> anyhow::Result>> { - match self.indexes.index(&index)? { - Some((_, update_store)) => Ok(update_store.meta(id)?), - None => bail!("index {:?} doesn't exist", index.as_ref()), - } - } - - fn all_update_status(&self, index: impl AsRef) -> anyhow::Result>> { - match self.indexes.index(&index)? { - Some((_, update_store)) => { - let updates = update_store.iter_metas(|processing, processed, pending, aborted, failed| { - Ok(processing - .map(UpdateStatus::from) - .into_iter() - .chain(pending.filter_map(|p| p.ok()).map(|(_, u)| UpdateStatus::from(u))) - .chain(aborted.filter_map(Result::ok).map(|(_, u)| UpdateStatus::from(u))) - .chain(processed.filter_map(Result::ok).map(|(_, u)| UpdateStatus::from(u))) - .chain(failed.filter_map(Result::ok).map(|(_, u)| UpdateStatus::from(u))) - .sorted_by(|a, b| a.id().cmp(&b.id())) - .collect()) - })?; - Ok(updates) - } - None => bail!("index {} doesn't exist.", index.as_ref()), - } - - } - - fn list_indexes(&self) -> anyhow::Result> { - let metas = self.indexes.list_indexes()?; - let mut output_meta = Vec::new(); - for (uid, meta, primary_key) in metas { - let created_at = meta.created_at; - let uuid = meta.uuid; - let updated_at = self - .all_update_status(&uid)? - .iter() - .filter_map(|u| u.processed().map(|u| u.processed_at)) - .max() - .unwrap_or(created_at); - - let index_meta = IndexMetadata { - uid, - created_at, - updated_at, - uuid, - primary_key, - }; - output_meta.push(index_meta); - } - Ok(output_meta) - } - - fn update_index(&self, uid: impl AsRef, index_settings: IndexSettings) -> anyhow::Result { - if index_settings.name.is_some() { - bail!("can't update an index name.") - } - - let (primary_key, meta) = match index_settings.primary_key { - Some(ref primary_key) => { - self.indexes - .update_index(&uid, |index| { - let mut txn = index.write_txn()?; - if index.primary_key(&txn)?.is_some() { - bail!("primary key already exists.") - } - index.put_primary_key(&mut txn, primary_key)?; - txn.commit()?; - Ok(Some(primary_key.clone())) - })? - }, - None => { - let (index, meta) = self.indexes - .index_with_meta(&uid)? - .with_context(|| format!("index {:?} doesn't exist.", uid.as_ref()))?; - let primary_key = index - .primary_key(&index.read_txn()?)? - .map(String::from); - (primary_key, meta) - }, - }; - - Ok(IndexMetadata { - uid: uid.as_ref().to_string(), - uuid: meta.uuid.clone(), - created_at: meta.created_at, - updated_at: meta.updated_at, - primary_key, - }) - } - - fn clear_documents(&self, index: impl AsRef) -> anyhow::Result { - let (_, update_store) = self.indexes.index(&index)? - .with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; - let meta = UpdateMeta::ClearDocuments; - let pending = update_store.register_update(meta, &[])?; - Ok(pending.into()) - } - - fn delete_documents(&self, index: impl AsRef, document_ids: Vec) -> anyhow::Result { - let (_, update_store) = self.indexes.index(&index)? - .with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; - let meta = UpdateMeta::DeleteDocuments; - let content = serde_json::to_vec(&document_ids)?; - let pending = update_store.register_update(meta, &content)?; - Ok(pending.into()) - } -} - -fn update_primary_key(index: impl AsRef, primary_key: impl AsRef) -> anyhow::Result<()> { - let index = index.as_ref(); - let mut txn = index.write_txn()?; - if index.primary_key(&txn)?.is_some() { - bail!("primary key already set.") - } - index.put_primary_key(&mut txn, primary_key.as_ref())?; - txn.commit()?; - Ok(()) -} - -#[cfg(test)] -mod test { - use super::*; - use tempfile::tempdir; - use crate::make_index_controller_tests; - - make_index_controller_tests!({ - let options = IndexerOpts::default(); - let path = tempdir().unwrap(); - let size = 4096 * 100; - LocalIndexController::new(path, options, size, size).unwrap() - }); -} diff --git a/meilisearch-http/tests/documents/add_documents.rs b/meilisearch-http/tests/documents/add_documents.rs index 63724af18..37b06f46f 100644 --- a/meilisearch-http/tests/documents/add_documents.rs +++ b/meilisearch-http/tests/documents/add_documents.rs @@ -28,7 +28,6 @@ async fn add_documents_no_index_creation() { let (response, code) = index.get_update(0).await; assert_eq!(code, 200); - println!("response: {}", response); assert_eq!(response["status"], "processed"); assert_eq!(response["updateId"], 0); assert_eq!(response["success"]["DocumentsAddition"]["nb_documents"], 1); diff --git a/meilisearch-http/tests/index/create_index.rs b/meilisearch-http/tests/index/create_index.rs index 718e35899..0bf2f15a5 100644 --- a/meilisearch-http/tests/index/create_index.rs +++ b/meilisearch-http/tests/index/create_index.rs @@ -7,7 +7,6 @@ async fn create_index_no_primary_key() { let index = server.index("test"); let (response, code) = index.create(None).await; - println!("response: {}", response); assert_eq!(code, 200); assert_eq!(response["uid"], "test"); diff --git a/meilisearch-http/tests/index/delete_index.rs b/meilisearch-http/tests/index/delete_index.rs index b15cc306e..5bc78950f 100644 --- a/meilisearch-http/tests/index/delete_index.rs +++ b/meilisearch-http/tests/index/delete_index.rs @@ -6,13 +6,11 @@ async fn create_and_delete_index() { let index = server.index("test"); let (_response, code) = index.create(None).await; - println!("response: {}", _response); assert_eq!(code, 200); let (_response, code) = index.delete().await; - println!("response: {}", _response); assert_eq!(code, 200); diff --git a/meilisearch-http/tests/settings/get_settings.rs b/meilisearch-http/tests/settings/get_settings.rs index b8217c5f7..feefe830f 100644 --- a/meilisearch-http/tests/settings/get_settings.rs +++ b/meilisearch-http/tests/settings/get_settings.rs @@ -36,7 +36,6 @@ async fn test_partial_update() { let server = Server::new().await; let index = server.index("test"); let (response, _code) = index.update_settings(json!({"displayedAttributes": ["foo"]})).await; - println!("response: {}", response); index.wait_update_id(0).await; let (response, code) = index.settings().await; assert_eq!(code, 200); @@ -44,7 +43,6 @@ async fn test_partial_update() { assert_eq!(response["searchableAttributes"],json!(["*"])); let (response, _) = index.update_settings(json!({"searchableAttributes": ["bar"]})).await; - println!("resp: {}", response); index.wait_update_id(1).await; let (response, code) = index.settings().await; @@ -96,7 +94,6 @@ async fn update_setting_unexisting_index_invalid_uid() { let server = Server::new().await; let index = server.index("test##! "); let (_response, code) = index.update_settings(json!({})).await; - println!("response: {}", _response); assert_eq!(code, 400); } diff --git a/meilisearch-http/tests/updates/mod.rs b/meilisearch-http/tests/updates/mod.rs index 713936b8c..64b5b560e 100644 --- a/meilisearch-http/tests/updates/mod.rs +++ b/meilisearch-http/tests/updates/mod.rs @@ -46,7 +46,6 @@ async fn list_no_updates() { let index = server.index("test"); index.create(None).await; let (response, code) = index.list_updates().await; - println!("response: {}", response); assert_eq!(code, 200); assert!(response.as_array().unwrap().is_empty()); }