diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 76933343d..fc406bd1e 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -440,7 +440,13 @@ impl IndexScheduler { Ok(vec![task]) } - Batch::IndexDeletion { index_uid, tasks } => todo!(), + Batch::IndexDeletion { index_uid, tasks } => { + let wtxn = self.env.write_txn()?; + // The write transaction is directly owned and commited here. + let index = self.index_mapper.delete_index(wtxn, &index_uid)?; + + todo!("update the tasks and mark them as succeeded"); + } } } diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index 1f786c5f8..00335609c 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -1,9 +1,10 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; -use std::fs; use std::path::PathBuf; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, RwLock, RwLockWriteGuard}; +use std::{fs, thread}; +use log::error; use milli::Index; use uuid::Uuid; @@ -11,6 +12,7 @@ use milli::heed::types::{SerdeBincode, Str}; use milli::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn}; use milli::update::IndexerConfig; +use self::IndexStatus::{Available, BeingDeleted}; use crate::{Error, Result}; const INDEX_MAPPING: &str = "index-mapping"; @@ -19,9 +21,10 @@ const INDEX_MAPPING: &str = "index-mapping"; pub struct IndexMapper { // Keep track of the opened indexes and is used // mainly by the index resolver. - index_map: Arc>>, + index_map: Arc>>, - // Map an index name with an index uuid currentl available on disk. + // TODO create a UUID Codec that uses the 16 bytes representation + // Map an index name with an index uuid currently available on disk. index_mapping: Database>, base_path: PathBuf, @@ -29,6 +32,16 @@ pub struct IndexMapper { indexer_config: Arc, } +/// Weither the index must not be inserted back +/// or it is available for use. +#[derive(Clone)] +pub enum IndexStatus { + /// Do not insert it back in the index map as it is currently being deleted. + BeingDeleted, + /// You can use the index without worrying about anything. + Available(Index), +} + impl IndexMapper { pub fn new( env: &Env, @@ -47,8 +60,8 @@ impl IndexMapper { /// Get or create the index. pub fn create_index(&self, wtxn: &mut RwTxn, name: &str) -> Result { - let index = match self.index(wtxn, name) { - Ok(index) => index, + match self.index(wtxn, name) { + Ok(index) => Ok(index), Err(Error::IndexNotFound(_)) => { let uuid = Uuid::new_v4(); self.index_mapping.put(wtxn, name, &uuid)?; @@ -57,12 +70,60 @@ impl IndexMapper { fs::create_dir_all(&index_path)?; let mut options = EnvOpenOptions::new(); options.map_size(self.index_size); - milli::Index::new(options, &index_path)? + Ok(milli::Index::new(options, &index_path)?) } - error => return error, + error => error, + } + } + + /// Removes the index from the mapping table and the in-memory index map + /// but keeps the associated tasks. + pub fn delete_index(&self, mut wtxn: RwTxn, name: &str) -> Result<()> { + let uuid = self + .index_mapping + .get(&wtxn, name)? + .ok_or_else(|| Error::IndexNotFound(name.to_string()))?; + + // Once we retrieved the UUID of the index we remove it from the mapping table. + assert!(self.index_mapping.delete(&mut wtxn, name)?); + + wtxn.commit()?; + + // We remove the index from the in-memory index map. + let mut lock = self.index_map.write().unwrap(); + let closing_event = match lock.insert(uuid, BeingDeleted) { + Some(Available(index)) => Some(index.prepare_for_closing()), + _ => None, }; - Ok(index) + drop(lock); + + let index_map = self.index_map.clone(); + let index_path = self.base_path.join(uuid.to_string()); + let index_name = name.to_string(); + thread::spawn(move || { + // We first wait to be sure that the previously opened index is effectively closed. + // This can take a lot of time, this is why we do that in a seperate thread. + if let Some(closing_event) = closing_event { + closing_event.wait(); + } + + // Then we remove the content from disk. + if let Err(e) = fs::remove_dir_all(&index_path) { + error!( + "An error happened when deleting the index {} ({}): {}", + index_name, uuid, e + ); + } + + // Finally we remove the entry from the index map. + assert!(matches!( + index_map.write().unwrap().remove(&uuid), + Some(BeingDeleted) + )); + }); + + Ok(()) } /// Return an index, may open it if it wasn't already opened. @@ -75,7 +136,8 @@ impl IndexMapper { // we clone here to drop the lock before entering the match let index = self.index_map.read().unwrap().get(&uuid).cloned(); let index = match index { - Some(index) => index, + Some(Available(index)) => index, + Some(BeingDeleted) => return Err(Error::IndexNotFound(name.to_string())), // since we're lazy, it's possible that the index has not been opened yet. None => { let mut index_map = self.index_map.write().unwrap(); @@ -92,11 +154,13 @@ impl IndexMapper { let mut options = EnvOpenOptions::new(); options.map_size(self.index_size); let index = milli::Index::new(options, &index_path)?; - - entry.insert(index.clone()); + entry.insert(Available(index.clone())); index } - Entry::Occupied(entry) => entry.get().clone(), + Entry::Occupied(entry) => match entry.get() { + Available(index) => index.clone(), + BeingDeleted => return Err(Error::IndexNotFound(name.to_string())), + }, } } };