From f3fc0bed45354da2ff54ff9b0f5471e6b074ec72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 6 Nov 2019 10:49:13 +0100 Subject: [PATCH] Introduce index soft deletion --- meilidb-core/src/database.rs | 42 ++++++++++++++++--- meilidb-core/src/store/main.rs | 4 ++ meilidb-core/src/store/mod.rs | 29 ++++++++++--- meilidb-core/src/store/synonyms.rs | 4 ++ meilidb-core/src/store/updates.rs | 4 ++ meilidb-core/src/store/updates_results.rs | 4 ++ meilidb-core/src/update/documents_addition.rs | 9 ++-- meilidb-core/src/update/documents_deletion.rs | 7 ++-- .../src/update/stop_words_addition.rs | 7 ++-- .../src/update/stop_words_deletion.rs | 7 ++-- meilidb-core/src/update/synonyms_addition.rs | 7 ++-- meilidb-core/src/update/synonyms_deletion.rs | 7 ++-- 12 files changed, 101 insertions(+), 30 deletions(-) diff --git a/meilidb-core/src/database.rs b/meilidb-core/src/database.rs index 0203e2e26..b65086ac2 100644 --- a/meilidb-core/src/database.rs +++ b/meilidb-core/src/database.rs @@ -4,7 +4,7 @@ use std::path::Path; use std::sync::{Arc, RwLock}; use std::{fs, thread}; -use crossbeam_channel::Receiver; +use crossbeam_channel::{Receiver, Sender}; use heed::types::{Str, Unit}; use heed::{CompactionOption, Result as ZResult}; use log::debug; @@ -33,9 +33,17 @@ macro_rules! r#break_try { }; } -fn update_awaiter(receiver: Receiver<()>, env: heed::Env, update_fn: Arc, index: Index) { - for () in receiver { - // consume all updates in order (oldest first) +pub enum UpdateEvent { + NewUpdate, + MustStop, +} + +pub type UpdateEvents = Receiver; +pub type UpdateEventsEmitter = Sender; + +fn update_awaiter(receiver: UpdateEvents, env: heed::Env, update_fn: Arc, index: Index) { + let mut receiver = receiver.into_iter(); + while let Some(UpdateEvent::NewUpdate) = receiver.next() { loop { // instantiate a main/parent transaction let mut writer = break_try!(env.write_txn(), "LMDB write transaction begin failed"); @@ -80,6 +88,8 @@ fn update_awaiter(receiver: Receiver<()>, env: heed::Env, update_fn: Arc) -> MResult { + let name = name.as_ref(); + let mut indexes_lock = self.indexes.write().unwrap(); + + match indexes_lock.remove_entry(name) { + Some((name, (index, _fn, handle))) => { + // remove the index name from the list of indexes + // and clear all the LMDB dbi + let mut writer = self.env.write_txn()?; + self.indexes_store.delete(&mut writer, &name)?; + store::clear(&mut writer, &index)?; + writer.commit()?; + + // join the update loop thread to ensure it is stopped + handle.join().unwrap(); + + Ok(true) + } + None => Ok(false), + } + } + pub fn set_update_callback(&self, name: impl AsRef, update_fn: BoxUpdateFn) -> bool { let indexes_lock = self.indexes.read().unwrap(); match indexes_lock.get(name.as_ref()) { diff --git a/meilidb-core/src/store/main.rs b/meilidb-core/src/store/main.rs index dca995759..5755c2da1 100644 --- a/meilidb-core/src/store/main.rs +++ b/meilidb-core/src/store/main.rs @@ -18,6 +18,10 @@ pub struct Main { } impl Main { + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + self.main.clear(writer) + } + pub fn put_words_fst(self, writer: &mut heed::RwTxn, fst: &fst::Set) -> ZResult<()> { let bytes = fst.as_fst().as_bytes(); self.main.put::(writer, WORDS_KEY, bytes) diff --git a/meilidb-core/src/store/mod.rs b/meilidb-core/src/store/mod.rs index f53900a79..2c5bdc31e 100644 --- a/meilidb-core/src/store/mod.rs +++ b/meilidb-core/src/store/mod.rs @@ -26,6 +26,7 @@ use serde::de::{self, Deserialize}; use zerocopy::{AsBytes, FromBytes}; use crate::criterion::Criteria; +use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::serde::Deserializer; use crate::{query_builder::QueryBuilder, update, DocumentId, Error, MResult}; @@ -91,7 +92,7 @@ pub struct Index { pub updates: Updates, pub updates_results: UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, } impl Index { @@ -139,12 +140,12 @@ impl Index { } pub fn schema_update(&self, writer: &mut heed::RwTxn, schema: Schema) -> MResult { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); update::push_schema_update(writer, self.updates, self.updates_results, schema) } pub fn customs_update(&self, writer: &mut heed::RwTxn, customs: Vec) -> ZResult { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); update::push_customs_update(writer, self.updates, self.updates_results, customs) } @@ -173,7 +174,7 @@ impl Index { } pub fn clear_all(&self, writer: &mut heed::RwTxn) -> MResult { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); update::push_clear_all(writer, self.updates, self.updates_results) } @@ -276,7 +277,7 @@ impl Index { pub fn create( env: &heed::Env, name: &str, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> MResult { // create all the store names let main_name = main_name(name); @@ -316,7 +317,7 @@ pub fn create( pub fn open( env: &heed::Env, name: &str, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> MResult> { // create all the store names let main_name = main_name(name); @@ -376,3 +377,19 @@ pub fn open( updates_notifier, })) } + +pub fn clear(writer: &mut heed::RwTxn, index: &Index) -> MResult<()> { + // send a stop event to the update loop of the index + index.updates_notifier.send(UpdateEvent::MustStop).unwrap(); + + // clear all the stores + index.main.clear(writer)?; + index.postings_lists.clear(writer)?; + index.documents_fields.clear(writer)?; + index.documents_fields_counts.clear(writer)?; + index.synonyms.clear(writer)?; + index.docs_words.clear(writer)?; + index.updates.clear(writer)?; + index.updates_results.clear(writer)?; + Ok(()) +} diff --git a/meilidb-core/src/store/synonyms.rs b/meilidb-core/src/store/synonyms.rs index 80b8bba67..2c497b86a 100644 --- a/meilidb-core/src/store/synonyms.rs +++ b/meilidb-core/src/store/synonyms.rs @@ -22,6 +22,10 @@ impl Synonyms { self.synonyms.delete(writer, word) } + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + self.synonyms.clear(writer) + } + pub fn synonyms(self, reader: &heed::RoTxn, word: &[u8]) -> ZResult> { match self.synonyms.get(reader, word)? { Some(bytes) => { diff --git a/meilidb-core/src/store/updates.rs b/meilidb-core/src/store/updates.rs index 4b3cdc51c..984da7b58 100644 --- a/meilidb-core/src/store/updates.rs +++ b/meilidb-core/src/store/updates.rs @@ -52,4 +52,8 @@ impl Updates { None => Ok(None), } } + + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + self.updates.clear(writer) + } } diff --git a/meilidb-core/src/store/updates_results.rs b/meilidb-core/src/store/updates_results.rs index 0a9d370a9..cf99c2acf 100644 --- a/meilidb-core/src/store/updates_results.rs +++ b/meilidb-core/src/store/updates_results.rs @@ -38,4 +38,8 @@ impl UpdatesResults { let update_id = BEU64::new(update_id); self.updates_results.get(reader, &update_id) } + + pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> { + self.updates_results.clear(writer) + } } diff --git a/meilidb-core/src/update/documents_addition.rs b/meilidb-core/src/update/documents_addition.rs index 7a7f03362..91683a931 100644 --- a/meilidb-core/src/update/documents_addition.rs +++ b/meilidb-core/src/update/documents_addition.rs @@ -4,6 +4,7 @@ use fst::{set::OpBuilder, SetBuilder}; use sdset::{duo::Union, SetOperation}; use serde::{Deserialize, Serialize}; +use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::raw_indexer::RawIndexer; use crate::serde::{extract_document_id, serialize_value, Deserializer, Serializer}; use crate::store; @@ -13,7 +14,7 @@ use crate::{Error, MResult, RankedMap}; pub struct DocumentsAddition { updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, documents: Vec, is_partial: bool, } @@ -22,7 +23,7 @@ impl DocumentsAddition { pub fn new( updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> DocumentsAddition { DocumentsAddition { updates_store, @@ -36,7 +37,7 @@ impl DocumentsAddition { pub fn new_partial( updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> DocumentsAddition { DocumentsAddition { updates_store, @@ -55,7 +56,7 @@ impl DocumentsAddition { where D: serde::Serialize, { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_documents_addition( writer, self.updates_store, diff --git a/meilidb-core/src/update/documents_deletion.rs b/meilidb-core/src/update/documents_deletion.rs index 31a9f4927..fb0f16ea1 100644 --- a/meilidb-core/src/update/documents_deletion.rs +++ b/meilidb-core/src/update/documents_deletion.rs @@ -4,6 +4,7 @@ use fst::{SetBuilder, Streamer}; use meilidb_schema::Schema; use sdset::{duo::DifferenceByKey, SetBuf, SetOperation}; +use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::serde::extract_document_id; use crate::store; use crate::update::{next_update_id, Update}; @@ -12,7 +13,7 @@ use crate::{DocumentId, Error, MResult, RankedMap}; pub struct DocumentsDeletion { updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, documents: Vec, } @@ -20,7 +21,7 @@ impl DocumentsDeletion { pub fn new( updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> DocumentsDeletion { DocumentsDeletion { updates_store, @@ -50,7 +51,7 @@ impl DocumentsDeletion { } pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_documents_deletion( writer, self.updates_store, diff --git a/meilidb-core/src/update/stop_words_addition.rs b/meilidb-core/src/update/stop_words_addition.rs index 6adba450b..12b00e373 100644 --- a/meilidb-core/src/update/stop_words_addition.rs +++ b/meilidb-core/src/update/stop_words_addition.rs @@ -3,13 +3,14 @@ use std::collections::BTreeSet; use fst::{set::OpBuilder, SetBuilder}; use crate::automaton::normalize_str; +use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::update::{next_update_id, Update}; use crate::{store, MResult}; pub struct StopWordsAddition { updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, stop_words: BTreeSet, } @@ -17,7 +18,7 @@ impl StopWordsAddition { pub fn new( updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> StopWordsAddition { StopWordsAddition { updates_store, @@ -33,7 +34,7 @@ impl StopWordsAddition { } pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_stop_words_addition( writer, self.updates_store, diff --git a/meilidb-core/src/update/stop_words_deletion.rs b/meilidb-core/src/update/stop_words_deletion.rs index 11c72ded9..eb4d1700c 100644 --- a/meilidb-core/src/update/stop_words_deletion.rs +++ b/meilidb-core/src/update/stop_words_deletion.rs @@ -3,6 +3,7 @@ use std::collections::BTreeSet; use fst::{set::OpBuilder, SetBuilder}; use crate::automaton::normalize_str; +use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::update::documents_addition::reindex_all_documents; use crate::update::{next_update_id, Update}; use crate::{store, MResult}; @@ -10,7 +11,7 @@ use crate::{store, MResult}; pub struct StopWordsDeletion { updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, stop_words: BTreeSet, } @@ -18,7 +19,7 @@ impl StopWordsDeletion { pub fn new( updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> StopWordsDeletion { StopWordsDeletion { updates_store, @@ -34,7 +35,7 @@ impl StopWordsDeletion { } pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_stop_words_deletion( writer, self.updates_store, diff --git a/meilidb-core/src/update/synonyms_addition.rs b/meilidb-core/src/update/synonyms_addition.rs index 656e5e727..b6a5ab2d4 100644 --- a/meilidb-core/src/update/synonyms_addition.rs +++ b/meilidb-core/src/update/synonyms_addition.rs @@ -4,13 +4,14 @@ use fst::{set::OpBuilder, SetBuilder}; use sdset::SetBuf; use crate::automaton::normalize_str; +use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::update::{next_update_id, Update}; use crate::{store, MResult}; pub struct SynonymsAddition { updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, synonyms: BTreeMap>, } @@ -18,7 +19,7 @@ impl SynonymsAddition { pub fn new( updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> SynonymsAddition { SynonymsAddition { updates_store, @@ -43,7 +44,7 @@ impl SynonymsAddition { } pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_synonyms_addition( writer, self.updates_store, diff --git a/meilidb-core/src/update/synonyms_deletion.rs b/meilidb-core/src/update/synonyms_deletion.rs index b76f3f761..6ec875bb6 100644 --- a/meilidb-core/src/update/synonyms_deletion.rs +++ b/meilidb-core/src/update/synonyms_deletion.rs @@ -5,13 +5,14 @@ use fst::{set::OpBuilder, SetBuilder}; use sdset::SetBuf; use crate::automaton::normalize_str; +use crate::database::{UpdateEvent, UpdateEventsEmitter}; use crate::update::{next_update_id, Update}; use crate::{store, MResult}; pub struct SynonymsDeletion { updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, synonyms: BTreeMap>>, } @@ -19,7 +20,7 @@ impl SynonymsDeletion { pub fn new( updates_store: store::Updates, updates_results_store: store::UpdatesResults, - updates_notifier: crossbeam_channel::Sender<()>, + updates_notifier: UpdateEventsEmitter, ) -> SynonymsDeletion { SynonymsDeletion { updates_store, @@ -50,7 +51,7 @@ impl SynonymsDeletion { } pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult { - let _ = self.updates_notifier.send(()); + let _ = self.updates_notifier.send(UpdateEvent::NewUpdate); let update_id = push_synonyms_deletion( writer, self.updates_store,