Introduce index soft deletion

This commit is contained in:
Clément Renault 2019-11-06 10:49:13 +01:00
parent 5dd6b697b9
commit f3fc0bed45
12 changed files with 101 additions and 30 deletions

View File

@ -4,7 +4,7 @@ use std::path::Path;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::{fs, thread}; use std::{fs, thread};
use crossbeam_channel::Receiver; use crossbeam_channel::{Receiver, Sender};
use heed::types::{Str, Unit}; use heed::types::{Str, Unit};
use heed::{CompactionOption, Result as ZResult}; use heed::{CompactionOption, Result as ZResult};
use log::debug; use log::debug;
@ -33,9 +33,17 @@ macro_rules! r#break_try {
}; };
} }
fn update_awaiter(receiver: Receiver<()>, env: heed::Env, update_fn: Arc<ArcSwapFn>, index: Index) { pub enum UpdateEvent {
for () in receiver { NewUpdate,
// consume all updates in order (oldest first) MustStop,
}
pub type UpdateEvents = Receiver<UpdateEvent>;
pub type UpdateEventsEmitter = Sender<UpdateEvent>;
fn update_awaiter(receiver: UpdateEvents, env: heed::Env, update_fn: Arc<ArcSwapFn>, index: Index) {
let mut receiver = receiver.into_iter();
while let Some(UpdateEvent::NewUpdate) = receiver.next() {
loop { loop {
// instantiate a main/parent transaction // instantiate a main/parent transaction
let mut writer = break_try!(env.write_txn(), "LMDB write transaction begin failed"); let mut writer = break_try!(env.write_txn(), "LMDB write transaction begin failed");
@ -80,6 +88,8 @@ fn update_awaiter(receiver: Receiver<()>, env: heed::Env, update_fn: Arc<ArcSwap
} }
} }
} }
debug!("update loop system stopped");
} }
impl Database { impl Database {
@ -130,7 +140,7 @@ impl Database {
// send an update notification to make sure that // send an update notification to make sure that
// possible pre-boot updates are consumed // possible pre-boot updates are consumed
sender.send(()).unwrap(); sender.send(UpdateEvent::NewUpdate).unwrap();
let result = indexes.insert(index_name, (index, update_fn, handle)); let result = indexes.insert(index_name, (index, update_fn, handle));
assert!( assert!(
@ -186,6 +196,28 @@ impl Database {
} }
} }
pub fn delete_index(&self, name: impl AsRef<str>) -> MResult<bool> {
let name = name.as_ref();
let mut indexes_lock = self.indexes.write().unwrap();
match indexes_lock.remove_entry(name) {
Some((name, (index, _fn, handle))) => {
// remove the index name from the list of indexes
// and clear all the LMDB dbi
let mut writer = self.env.write_txn()?;
self.indexes_store.delete(&mut writer, &name)?;
store::clear(&mut writer, &index)?;
writer.commit()?;
// join the update loop thread to ensure it is stopped
handle.join().unwrap();
Ok(true)
}
None => Ok(false),
}
}
pub fn set_update_callback(&self, name: impl AsRef<str>, update_fn: BoxUpdateFn) -> bool { pub fn set_update_callback(&self, name: impl AsRef<str>, update_fn: BoxUpdateFn) -> bool {
let indexes_lock = self.indexes.read().unwrap(); let indexes_lock = self.indexes.read().unwrap();
match indexes_lock.get(name.as_ref()) { match indexes_lock.get(name.as_ref()) {

View File

@ -18,6 +18,10 @@ pub struct Main {
} }
impl Main { impl Main {
pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> {
self.main.clear(writer)
}
pub fn put_words_fst(self, writer: &mut heed::RwTxn, fst: &fst::Set) -> ZResult<()> { pub fn put_words_fst(self, writer: &mut heed::RwTxn, fst: &fst::Set) -> ZResult<()> {
let bytes = fst.as_fst().as_bytes(); let bytes = fst.as_fst().as_bytes();
self.main.put::<Str, ByteSlice>(writer, WORDS_KEY, bytes) self.main.put::<Str, ByteSlice>(writer, WORDS_KEY, bytes)

View File

@ -26,6 +26,7 @@ use serde::de::{self, Deserialize};
use zerocopy::{AsBytes, FromBytes}; use zerocopy::{AsBytes, FromBytes};
use crate::criterion::Criteria; use crate::criterion::Criteria;
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::serde::Deserializer; use crate::serde::Deserializer;
use crate::{query_builder::QueryBuilder, update, DocumentId, Error, MResult}; use crate::{query_builder::QueryBuilder, update, DocumentId, Error, MResult};
@ -91,7 +92,7 @@ pub struct Index {
pub updates: Updates, pub updates: Updates,
pub updates_results: UpdatesResults, pub updates_results: UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: UpdateEventsEmitter,
} }
impl Index { impl Index {
@ -139,12 +140,12 @@ impl Index {
} }
pub fn schema_update(&self, writer: &mut heed::RwTxn, schema: Schema) -> MResult<u64> { pub fn schema_update(&self, writer: &mut heed::RwTxn, schema: Schema) -> MResult<u64> {
let _ = self.updates_notifier.send(()); let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
update::push_schema_update(writer, self.updates, self.updates_results, schema) update::push_schema_update(writer, self.updates, self.updates_results, schema)
} }
pub fn customs_update(&self, writer: &mut heed::RwTxn, customs: Vec<u8>) -> ZResult<u64> { pub fn customs_update(&self, writer: &mut heed::RwTxn, customs: Vec<u8>) -> ZResult<u64> {
let _ = self.updates_notifier.send(()); let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
update::push_customs_update(writer, self.updates, self.updates_results, customs) update::push_customs_update(writer, self.updates, self.updates_results, customs)
} }
@ -173,7 +174,7 @@ impl Index {
} }
pub fn clear_all(&self, writer: &mut heed::RwTxn) -> MResult<u64> { pub fn clear_all(&self, writer: &mut heed::RwTxn) -> MResult<u64> {
let _ = self.updates_notifier.send(()); let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
update::push_clear_all(writer, self.updates, self.updates_results) update::push_clear_all(writer, self.updates, self.updates_results)
} }
@ -276,7 +277,7 @@ impl Index {
pub fn create( pub fn create(
env: &heed::Env, env: &heed::Env,
name: &str, name: &str,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: UpdateEventsEmitter,
) -> MResult<Index> { ) -> MResult<Index> {
// create all the store names // create all the store names
let main_name = main_name(name); let main_name = main_name(name);
@ -316,7 +317,7 @@ pub fn create(
pub fn open( pub fn open(
env: &heed::Env, env: &heed::Env,
name: &str, name: &str,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: UpdateEventsEmitter,
) -> MResult<Option<Index>> { ) -> MResult<Option<Index>> {
// create all the store names // create all the store names
let main_name = main_name(name); let main_name = main_name(name);
@ -376,3 +377,19 @@ pub fn open(
updates_notifier, updates_notifier,
})) }))
} }
pub fn clear(writer: &mut heed::RwTxn, index: &Index) -> MResult<()> {
// send a stop event to the update loop of the index
index.updates_notifier.send(UpdateEvent::MustStop).unwrap();
// clear all the stores
index.main.clear(writer)?;
index.postings_lists.clear(writer)?;
index.documents_fields.clear(writer)?;
index.documents_fields_counts.clear(writer)?;
index.synonyms.clear(writer)?;
index.docs_words.clear(writer)?;
index.updates.clear(writer)?;
index.updates_results.clear(writer)?;
Ok(())
}

View File

@ -22,6 +22,10 @@ impl Synonyms {
self.synonyms.delete(writer, word) self.synonyms.delete(writer, word)
} }
pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> {
self.synonyms.clear(writer)
}
pub fn synonyms(self, reader: &heed::RoTxn, word: &[u8]) -> ZResult<Option<fst::Set>> { pub fn synonyms(self, reader: &heed::RoTxn, word: &[u8]) -> ZResult<Option<fst::Set>> {
match self.synonyms.get(reader, word)? { match self.synonyms.get(reader, word)? {
Some(bytes) => { Some(bytes) => {

View File

@ -52,4 +52,8 @@ impl Updates {
None => Ok(None), None => Ok(None),
} }
} }
pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> {
self.updates.clear(writer)
}
} }

View File

@ -38,4 +38,8 @@ impl UpdatesResults {
let update_id = BEU64::new(update_id); let update_id = BEU64::new(update_id);
self.updates_results.get(reader, &update_id) self.updates_results.get(reader, &update_id)
} }
pub fn clear(self, writer: &mut heed::RwTxn) -> ZResult<()> {
self.updates_results.clear(writer)
}
} }

View File

@ -4,6 +4,7 @@ use fst::{set::OpBuilder, SetBuilder};
use sdset::{duo::Union, SetOperation}; use sdset::{duo::Union, SetOperation};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::raw_indexer::RawIndexer; use crate::raw_indexer::RawIndexer;
use crate::serde::{extract_document_id, serialize_value, Deserializer, Serializer}; use crate::serde::{extract_document_id, serialize_value, Deserializer, Serializer};
use crate::store; use crate::store;
@ -13,7 +14,7 @@ use crate::{Error, MResult, RankedMap};
pub struct DocumentsAddition<D> { pub struct DocumentsAddition<D> {
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: UpdateEventsEmitter,
documents: Vec<D>, documents: Vec<D>,
is_partial: bool, is_partial: bool,
} }
@ -22,7 +23,7 @@ impl<D> DocumentsAddition<D> {
pub fn new( pub fn new(
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: UpdateEventsEmitter,
) -> DocumentsAddition<D> { ) -> DocumentsAddition<D> {
DocumentsAddition { DocumentsAddition {
updates_store, updates_store,
@ -36,7 +37,7 @@ impl<D> DocumentsAddition<D> {
pub fn new_partial( pub fn new_partial(
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: UpdateEventsEmitter,
) -> DocumentsAddition<D> { ) -> DocumentsAddition<D> {
DocumentsAddition { DocumentsAddition {
updates_store, updates_store,
@ -55,7 +56,7 @@ impl<D> DocumentsAddition<D> {
where where
D: serde::Serialize, D: serde::Serialize,
{ {
let _ = self.updates_notifier.send(()); let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
let update_id = push_documents_addition( let update_id = push_documents_addition(
writer, writer,
self.updates_store, self.updates_store,

View File

@ -4,6 +4,7 @@ use fst::{SetBuilder, Streamer};
use meilidb_schema::Schema; use meilidb_schema::Schema;
use sdset::{duo::DifferenceByKey, SetBuf, SetOperation}; use sdset::{duo::DifferenceByKey, SetBuf, SetOperation};
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::serde::extract_document_id; use crate::serde::extract_document_id;
use crate::store; use crate::store;
use crate::update::{next_update_id, Update}; use crate::update::{next_update_id, Update};
@ -12,7 +13,7 @@ use crate::{DocumentId, Error, MResult, RankedMap};
pub struct DocumentsDeletion { pub struct DocumentsDeletion {
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: UpdateEventsEmitter,
documents: Vec<DocumentId>, documents: Vec<DocumentId>,
} }
@ -20,7 +21,7 @@ impl DocumentsDeletion {
pub fn new( pub fn new(
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: UpdateEventsEmitter,
) -> DocumentsDeletion { ) -> DocumentsDeletion {
DocumentsDeletion { DocumentsDeletion {
updates_store, updates_store,
@ -50,7 +51,7 @@ impl DocumentsDeletion {
} }
pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> { pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> {
let _ = self.updates_notifier.send(()); let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
let update_id = push_documents_deletion( let update_id = push_documents_deletion(
writer, writer,
self.updates_store, self.updates_store,

View File

@ -3,13 +3,14 @@ use std::collections::BTreeSet;
use fst::{set::OpBuilder, SetBuilder}; use fst::{set::OpBuilder, SetBuilder};
use crate::automaton::normalize_str; use crate::automaton::normalize_str;
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::update::{next_update_id, Update}; use crate::update::{next_update_id, Update};
use crate::{store, MResult}; use crate::{store, MResult};
pub struct StopWordsAddition { pub struct StopWordsAddition {
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: UpdateEventsEmitter,
stop_words: BTreeSet<String>, stop_words: BTreeSet<String>,
} }
@ -17,7 +18,7 @@ impl StopWordsAddition {
pub fn new( pub fn new(
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: UpdateEventsEmitter,
) -> StopWordsAddition { ) -> StopWordsAddition {
StopWordsAddition { StopWordsAddition {
updates_store, updates_store,
@ -33,7 +34,7 @@ impl StopWordsAddition {
} }
pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> { pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> {
let _ = self.updates_notifier.send(()); let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
let update_id = push_stop_words_addition( let update_id = push_stop_words_addition(
writer, writer,
self.updates_store, self.updates_store,

View File

@ -3,6 +3,7 @@ use std::collections::BTreeSet;
use fst::{set::OpBuilder, SetBuilder}; use fst::{set::OpBuilder, SetBuilder};
use crate::automaton::normalize_str; use crate::automaton::normalize_str;
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::update::documents_addition::reindex_all_documents; use crate::update::documents_addition::reindex_all_documents;
use crate::update::{next_update_id, Update}; use crate::update::{next_update_id, Update};
use crate::{store, MResult}; use crate::{store, MResult};
@ -10,7 +11,7 @@ use crate::{store, MResult};
pub struct StopWordsDeletion { pub struct StopWordsDeletion {
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: UpdateEventsEmitter,
stop_words: BTreeSet<String>, stop_words: BTreeSet<String>,
} }
@ -18,7 +19,7 @@ impl StopWordsDeletion {
pub fn new( pub fn new(
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: UpdateEventsEmitter,
) -> StopWordsDeletion { ) -> StopWordsDeletion {
StopWordsDeletion { StopWordsDeletion {
updates_store, updates_store,
@ -34,7 +35,7 @@ impl StopWordsDeletion {
} }
pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> { pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> {
let _ = self.updates_notifier.send(()); let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
let update_id = push_stop_words_deletion( let update_id = push_stop_words_deletion(
writer, writer,
self.updates_store, self.updates_store,

View File

@ -4,13 +4,14 @@ use fst::{set::OpBuilder, SetBuilder};
use sdset::SetBuf; use sdset::SetBuf;
use crate::automaton::normalize_str; use crate::automaton::normalize_str;
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::update::{next_update_id, Update}; use crate::update::{next_update_id, Update};
use crate::{store, MResult}; use crate::{store, MResult};
pub struct SynonymsAddition { pub struct SynonymsAddition {
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: UpdateEventsEmitter,
synonyms: BTreeMap<String, Vec<String>>, synonyms: BTreeMap<String, Vec<String>>,
} }
@ -18,7 +19,7 @@ impl SynonymsAddition {
pub fn new( pub fn new(
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: UpdateEventsEmitter,
) -> SynonymsAddition { ) -> SynonymsAddition {
SynonymsAddition { SynonymsAddition {
updates_store, updates_store,
@ -43,7 +44,7 @@ impl SynonymsAddition {
} }
pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> { pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> {
let _ = self.updates_notifier.send(()); let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
let update_id = push_synonyms_addition( let update_id = push_synonyms_addition(
writer, writer,
self.updates_store, self.updates_store,

View File

@ -5,13 +5,14 @@ use fst::{set::OpBuilder, SetBuilder};
use sdset::SetBuf; use sdset::SetBuf;
use crate::automaton::normalize_str; use crate::automaton::normalize_str;
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::update::{next_update_id, Update}; use crate::update::{next_update_id, Update};
use crate::{store, MResult}; use crate::{store, MResult};
pub struct SynonymsDeletion { pub struct SynonymsDeletion {
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: UpdateEventsEmitter,
synonyms: BTreeMap<String, Option<Vec<String>>>, synonyms: BTreeMap<String, Option<Vec<String>>>,
} }
@ -19,7 +20,7 @@ impl SynonymsDeletion {
pub fn new( pub fn new(
updates_store: store::Updates, updates_store: store::Updates,
updates_results_store: store::UpdatesResults, updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>, updates_notifier: UpdateEventsEmitter,
) -> SynonymsDeletion { ) -> SynonymsDeletion {
SynonymsDeletion { SynonymsDeletion {
updates_store, updates_store,
@ -50,7 +51,7 @@ impl SynonymsDeletion {
} }
pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> { pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> {
let _ = self.updates_notifier.send(()); let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
let update_id = push_synonyms_deletion( let update_id = push_synonyms_deletion(
writer, writer,
self.updates_store, self.updates_store,