From 30d2b24689d9b132d3d408b3a8c95aa2dc2b987a Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 7 Sep 2022 21:27:06 +0200 Subject: [PATCH] implements the index deletion, creation and swap --- index-scheduler/src/error.rs | 6 ++-- index-scheduler/src/index/index.rs | 10 ++++++ index-scheduler/src/index/mod.rs | 7 ++++ index-scheduler/src/lib.rs | 54 +++++++++++++++++++++++++++--- 4 files changed, 70 insertions(+), 7 deletions(-) diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index 563e5a7d3..faf63497c 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -5,9 +5,9 @@ use crate::index; #[derive(Error, Debug)] pub enum Error { - #[error("Index `{}` not found", .0)] + #[error("Index `{0}` not found")] IndexNotFound(String), - #[error("Index `{}` already exists", .0)] + #[error("Index `{0}` already exists")] IndexAlreadyExists(String), #[error("Corrupted task queue.")] CorruptedTaskQueue, @@ -17,6 +17,8 @@ pub enum Error { Milli(#[from] milli::Error), #[error("{0}")] IndexError(#[from] index::error::IndexError), + #[error(transparent)] + IoError(#[from] std::io::Error), #[error(transparent)] Anyhow(#[from] anyhow::Error), diff --git a/index-scheduler/src/index/index.rs b/index-scheduler/src/index/index.rs index 36f195abe..7ee4b712b 100644 --- a/index-scheduler/src/index/index.rs +++ b/index-scheduler/src/index/index.rs @@ -22,6 +22,7 @@ use super::{Checked, Settings}; pub type Document = Map; +// @kero, what is this structure? Shouldn't it move entirely to milli? #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct IndexMeta { @@ -50,6 +51,7 @@ impl IndexMeta { } } +// @kero Maybe this should be entirely generated somewhere else since it doesn't really concern the index? #[derive(Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct IndexStats { @@ -105,6 +107,14 @@ impl Index { self.inner.as_ref().clone().prepare_for_closing(); } + pub fn delete(self) -> Result<()> { + let path = self.path().to_path_buf(); + self.inner.as_ref().clone().prepare_for_closing().wait(); + std::fs::remove_file(path)?; + + Ok(()) + } + pub fn stats(&self) -> Result { let rtxn = self.read_txn()?; diff --git a/index-scheduler/src/index/mod.rs b/index-scheduler/src/index/mod.rs index 505417dca..cd9ed1b69 100644 --- a/index-scheduler/src/index/mod.rs +++ b/index-scheduler/src/index/mod.rs @@ -137,6 +137,13 @@ pub mod test { } } + pub fn delete(self) -> Result<()> { + match self { + MockIndex::Real(index) => index.delete(), + MockIndex::Mock(m) => unsafe { m.get("delete").call(()) }, + } + } + pub fn perform_search(&self, query: SearchQuery) -> Result { match self { MockIndex::Real(index) => index.perform_search(query), diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 0706c73b6..dfd88888f 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -222,6 +222,14 @@ impl IndexScheduler { // TODO: TAMO: do this later // self.handle_batch_result(res); + + match wtxn.commit() { + Ok(()) => log::info!("A batch of tasks was successfully completed."), + Err(e) => { + log::error!("{}", e); + continue; + } + } } } @@ -253,15 +261,37 @@ impl IndexScheduler { } self.available_index.put(wtxn, &index_name, &true); - // let index = - todo!("tamo: once I get index.rs to works"); + // TODO: TAMO: give real info to the index + let index = Index::open( + index_name.to_string(), + index_name.to_string(), + 100_000_000, + Arc::default(), + )?; + if let Some(primary_key) = primary_key { + index.update_primary_key(primary_key.to_string())?; + } + self.index_map + .write() + .map_err(|_| Error::CorruptedTaskQueue)? + .insert(index_name.to_string(), index.clone()); } KindWithContent::DeleteIndex { index_name } => { self.index_map.write(); if !self.available_index.delete(wtxn, &index_name)? { return Err(Error::IndexNotFound(index_name.to_string())); } - todo!("tamo: once I get index.rs to works"); + if let Some(index) = self + .index_map + .write() + .map_err(|_| Error::CorruptedTaskQueue)? + .remove(index_name) + { + index.delete()?; + } else { + // TODO: TAMO: fix the path + std::fs::remove_file(index_name)?; + } } KindWithContent::SwapIndex { lhs, rhs } => { if !self.available_index.get(wtxn, &lhs)?.unwrap_or(false) { @@ -271,12 +301,26 @@ impl IndexScheduler { return Err(Error::IndexNotFound(rhs.to_string())); } - let index_map = self + let lhs_bitmap = self.index_tasks.get(wtxn, lhs)?; + let rhs_bitmap = self.index_tasks.get(wtxn, rhs)?; + // the bitmap are lazily created and thus may not exists. + if let Some(bitmap) = rhs_bitmap { + self.index_tasks.put(wtxn, lhs, &bitmap)?; + } + if let Some(bitmap) = lhs_bitmap { + self.index_tasks.put(wtxn, rhs, &bitmap)?; + } + + let mut index_map = self .index_map .write() .map_err(|_| Error::CorruptedTaskQueue)?; - // index_map.remove. + let lhs_index = index_map.remove(lhs).unwrap(); + let rhs_index = index_map.remove(rhs).unwrap(); + + index_map.insert(lhs.to_string(), rhs_index); + index_map.insert(rhs.to_string(), lhs_index); } _ => unreachable!(), },