mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 20:37:15 +02:00
Merge branch 'moving-to-lmdb'
This commit is contained in:
commit
f56636e1e9
78 changed files with 3369 additions and 3351 deletions
189
meilidb-core/src/update/documents_addition.rs
Normal file
189
meilidb-core/src/update/documents_addition.rs
Normal file
|
@ -0,0 +1,189 @@
|
|||
use std::collections::HashSet;
|
||||
|
||||
use fst::{SetBuilder, set::OpBuilder};
|
||||
use sdset::{SetOperation, duo::Union};
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::raw_indexer::RawIndexer;
|
||||
use crate::serde::{extract_document_id, Serializer, RamDocumentStore};
|
||||
use crate::store;
|
||||
use crate::update::{Update, next_update_id, apply_documents_deletion};
|
||||
use crate::{MResult, Error, RankedMap};
|
||||
|
||||
pub struct DocumentsAddition<D> {
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
updates_notifier: crossbeam_channel::Sender<()>,
|
||||
documents: Vec<D>,
|
||||
}
|
||||
|
||||
impl<D> DocumentsAddition<D> {
|
||||
pub fn new(
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
updates_notifier: crossbeam_channel::Sender<()>,
|
||||
) -> DocumentsAddition<D>
|
||||
{
|
||||
DocumentsAddition {
|
||||
updates_store,
|
||||
updates_results_store,
|
||||
updates_notifier,
|
||||
documents: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_document(&mut self, document: D) {
|
||||
self.documents.push(document);
|
||||
}
|
||||
|
||||
pub fn finalize(self, mut writer: rkv::Writer) -> MResult<u64>
|
||||
where D: serde::Serialize
|
||||
{
|
||||
let update_id = push_documents_addition(
|
||||
&mut writer,
|
||||
self.updates_store,
|
||||
self.updates_results_store,
|
||||
self.documents,
|
||||
)?;
|
||||
writer.commit()?;
|
||||
let _ = self.updates_notifier.send(());
|
||||
|
||||
Ok(update_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl<D> Extend<D> for DocumentsAddition<D> {
|
||||
fn extend<T: IntoIterator<Item=D>>(&mut self, iter: T) {
|
||||
self.documents.extend(iter)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_documents_addition<D: serde::Serialize>(
|
||||
writer: &mut rkv::Writer,
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
addition: Vec<D>,
|
||||
) -> MResult<u64>
|
||||
{
|
||||
let mut values = Vec::with_capacity(addition.len());
|
||||
for add in addition {
|
||||
let vec = rmp_serde::to_vec_named(&add)?;
|
||||
let add = rmp_serde::from_read(&vec[..])?;
|
||||
values.push(add);
|
||||
}
|
||||
|
||||
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
||||
|
||||
let update = Update::DocumentsAddition(values);
|
||||
updates_store.put_update(writer, last_update_id, &update)?;
|
||||
|
||||
Ok(last_update_id)
|
||||
}
|
||||
|
||||
pub fn apply_documents_addition(
|
||||
writer: &mut rkv::Writer,
|
||||
main_store: store::Main,
|
||||
documents_fields_store: store::DocumentsFields,
|
||||
postings_lists_store: store::PostingsLists,
|
||||
docs_words_store: store::DocsWords,
|
||||
mut ranked_map: RankedMap,
|
||||
addition: Vec<rmpv::Value>,
|
||||
) -> MResult<()>
|
||||
{
|
||||
let mut document_ids = HashSet::new();
|
||||
let mut document_store = RamDocumentStore::new();
|
||||
let mut indexer = RawIndexer::new();
|
||||
|
||||
let schema = match main_store.schema(writer)? {
|
||||
Some(schema) => schema,
|
||||
None => return Err(Error::SchemaMissing),
|
||||
};
|
||||
|
||||
let identifier = schema.identifier_name();
|
||||
|
||||
for document in addition {
|
||||
let document_id = match extract_document_id(identifier, &document)? {
|
||||
Some(id) => id,
|
||||
None => return Err(Error::MissingDocumentId),
|
||||
};
|
||||
|
||||
// 1. store the document id for future deletion
|
||||
document_ids.insert(document_id);
|
||||
|
||||
// 2. index the document fields in ram stores
|
||||
let serializer = Serializer {
|
||||
schema: &schema,
|
||||
document_store: &mut document_store,
|
||||
indexer: &mut indexer,
|
||||
ranked_map: &mut ranked_map,
|
||||
document_id,
|
||||
};
|
||||
|
||||
document.serialize(serializer)?;
|
||||
}
|
||||
|
||||
// 1. remove the previous documents match indexes
|
||||
let documents_to_insert = document_ids.iter().cloned().collect();
|
||||
apply_documents_deletion(
|
||||
writer,
|
||||
main_store,
|
||||
documents_fields_store,
|
||||
postings_lists_store,
|
||||
docs_words_store,
|
||||
ranked_map.clone(),
|
||||
documents_to_insert,
|
||||
)?;
|
||||
|
||||
// 2. insert new document attributes in the database
|
||||
for ((id, attr), value) in document_store.into_inner() {
|
||||
documents_fields_store.put_document_field(writer, id, attr, &value)?;
|
||||
}
|
||||
|
||||
let indexed = indexer.build();
|
||||
let mut delta_words_builder = SetBuilder::memory();
|
||||
|
||||
for (word, delta_set) in indexed.words_doc_indexes {
|
||||
delta_words_builder.insert(&word).unwrap();
|
||||
|
||||
let set = match postings_lists_store.postings_list(writer, &word)? {
|
||||
Some(set) => Union::new(&set, &delta_set).into_set_buf(),
|
||||
None => delta_set,
|
||||
};
|
||||
|
||||
postings_lists_store.put_postings_list(writer, &word, &set)?;
|
||||
}
|
||||
|
||||
for (id, words) in indexed.docs_words {
|
||||
docs_words_store.put_doc_words(writer, id, &words)?;
|
||||
}
|
||||
|
||||
let delta_words = delta_words_builder
|
||||
.into_inner()
|
||||
.and_then(fst::Set::from_bytes)
|
||||
.unwrap();
|
||||
|
||||
let words = match main_store.words_fst(writer)? {
|
||||
Some(words) => {
|
||||
let op = OpBuilder::new()
|
||||
.add(words.stream())
|
||||
.add(delta_words.stream())
|
||||
.r#union();
|
||||
|
||||
let mut words_builder = SetBuilder::memory();
|
||||
words_builder.extend_stream(op).unwrap();
|
||||
words_builder
|
||||
.into_inner()
|
||||
.and_then(fst::Set::from_bytes)
|
||||
.unwrap()
|
||||
},
|
||||
None => delta_words,
|
||||
};
|
||||
|
||||
main_store.put_words_fst(writer, &words)?;
|
||||
main_store.put_ranked_map(writer, &ranked_map)?;
|
||||
|
||||
let inserted_documents_len = document_ids.len() as u64;
|
||||
main_store.put_number_of_documents(writer, |old| old + inserted_documents_len)?;
|
||||
|
||||
Ok(())
|
||||
}
|
180
meilidb-core/src/update/documents_deletion.rs
Normal file
180
meilidb-core/src/update/documents_deletion.rs
Normal file
|
@ -0,0 +1,180 @@
|
|||
use std::collections::{HashMap, HashSet, BTreeSet};
|
||||
|
||||
use fst::{SetBuilder, Streamer};
|
||||
use meilidb_schema::Schema;
|
||||
use sdset::{SetBuf, SetOperation, duo::DifferenceByKey};
|
||||
|
||||
use crate::{DocumentId, RankedMap, MResult, Error};
|
||||
use crate::serde::extract_document_id;
|
||||
use crate::update::{Update, next_update_id};
|
||||
use crate::store;
|
||||
|
||||
pub struct DocumentsDeletion {
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
updates_notifier: crossbeam_channel::Sender<()>,
|
||||
documents: Vec<DocumentId>,
|
||||
}
|
||||
|
||||
impl DocumentsDeletion {
|
||||
pub fn new(
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
updates_notifier: crossbeam_channel::Sender<()>,
|
||||
) -> DocumentsDeletion
|
||||
{
|
||||
DocumentsDeletion {
|
||||
updates_store,
|
||||
updates_results_store,
|
||||
updates_notifier,
|
||||
documents: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn delete_document_by_id(&mut self, document_id: DocumentId) {
|
||||
self.documents.push(document_id);
|
||||
}
|
||||
|
||||
pub fn delete_document<D>(&mut self, schema: &Schema, document: D) -> MResult<()>
|
||||
where D: serde::Serialize,
|
||||
{
|
||||
let identifier = schema.identifier_name();
|
||||
let document_id = match extract_document_id(identifier, &document)? {
|
||||
Some(id) => id,
|
||||
None => return Err(Error::MissingDocumentId),
|
||||
};
|
||||
|
||||
self.delete_document_by_id(document_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn finalize(self, mut writer: rkv::Writer) -> MResult<u64> {
|
||||
let update_id = push_documents_deletion(
|
||||
&mut writer,
|
||||
self.updates_store,
|
||||
self.updates_results_store,
|
||||
self.documents,
|
||||
)?;
|
||||
writer.commit()?;
|
||||
let _ = self.updates_notifier.send(());
|
||||
|
||||
Ok(update_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl Extend<DocumentId> for DocumentsDeletion {
|
||||
fn extend<T: IntoIterator<Item=DocumentId>>(&mut self, iter: T) {
|
||||
self.documents.extend(iter)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_documents_deletion(
|
||||
writer: &mut rkv::Writer,
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
deletion: Vec<DocumentId>,
|
||||
) -> MResult<u64>
|
||||
{
|
||||
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
||||
|
||||
let update = Update::DocumentsDeletion(deletion);
|
||||
updates_store.put_update(writer, last_update_id, &update)?;
|
||||
|
||||
Ok(last_update_id)
|
||||
}
|
||||
|
||||
pub fn apply_documents_deletion(
|
||||
writer: &mut rkv::Writer,
|
||||
main_store: store::Main,
|
||||
documents_fields_store: store::DocumentsFields,
|
||||
postings_lists_store: store::PostingsLists,
|
||||
docs_words_store: store::DocsWords,
|
||||
mut ranked_map: RankedMap,
|
||||
deletion: Vec<DocumentId>,
|
||||
) -> MResult<()>
|
||||
{
|
||||
let idset = SetBuf::from_dirty(deletion);
|
||||
|
||||
let schema = match main_store.schema(writer)? {
|
||||
Some(schema) => schema,
|
||||
None => return Err(Error::SchemaMissing),
|
||||
};
|
||||
|
||||
// collect the ranked attributes according to the schema
|
||||
let ranked_attrs: Vec<_> = schema.iter()
|
||||
.filter_map(|(_, attr, prop)| {
|
||||
if prop.is_ranked() { Some(attr) } else { None }
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut words_document_ids = HashMap::new();
|
||||
for id in idset {
|
||||
// remove all the ranked attributes from the ranked_map
|
||||
for ranked_attr in &ranked_attrs {
|
||||
ranked_map.remove(id, *ranked_attr);
|
||||
}
|
||||
|
||||
if let Some(words) = docs_words_store.doc_words(writer, id)? {
|
||||
let mut stream = words.stream();
|
||||
while let Some(word) = stream.next() {
|
||||
let word = word.to_vec();
|
||||
words_document_ids.entry(word).or_insert_with(Vec::new).push(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut deleted_documents = HashSet::new();
|
||||
let mut removed_words = BTreeSet::new();
|
||||
for (word, document_ids) in words_document_ids {
|
||||
let document_ids = SetBuf::from_dirty(document_ids);
|
||||
|
||||
if let Some(doc_indexes) = postings_lists_store.postings_list(writer, &word)? {
|
||||
let op = DifferenceByKey::new(&doc_indexes, &document_ids, |d| d.document_id, |id| *id);
|
||||
let doc_indexes = op.into_set_buf();
|
||||
|
||||
if !doc_indexes.is_empty() {
|
||||
postings_lists_store.put_postings_list(writer, &word, &doc_indexes)?;
|
||||
} else {
|
||||
postings_lists_store.del_postings_list(writer, &word)?;
|
||||
removed_words.insert(word);
|
||||
}
|
||||
}
|
||||
|
||||
for id in document_ids {
|
||||
if documents_fields_store.del_all_document_fields(writer, id)? != 0 {
|
||||
deleted_documents.insert(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let deleted_documents_len = deleted_documents.len() as u64;
|
||||
for id in deleted_documents {
|
||||
docs_words_store.del_doc_words(writer, id)?;
|
||||
}
|
||||
|
||||
let removed_words = fst::Set::from_iter(removed_words).unwrap();
|
||||
let words = match main_store.words_fst(writer)? {
|
||||
Some(words_set) => {
|
||||
let op = fst::set::OpBuilder::new()
|
||||
.add(words_set.stream())
|
||||
.add(removed_words.stream())
|
||||
.difference();
|
||||
|
||||
let mut words_builder = SetBuilder::memory();
|
||||
words_builder.extend_stream(op).unwrap();
|
||||
words_builder
|
||||
.into_inner()
|
||||
.and_then(fst::Set::from_bytes)
|
||||
.unwrap()
|
||||
},
|
||||
None => fst::Set::default(),
|
||||
};
|
||||
|
||||
main_store.put_words_fst(writer, &words)?;
|
||||
main_store.put_ranked_map(writer, &ranked_map)?;
|
||||
|
||||
main_store.put_number_of_documents(writer, |old| old - deleted_documents_len)?;
|
||||
|
||||
Ok(())
|
||||
}
|
202
meilidb-core/src/update/mod.rs
Normal file
202
meilidb-core/src/update/mod.rs
Normal file
|
@ -0,0 +1,202 @@
|
|||
mod documents_addition;
|
||||
mod documents_deletion;
|
||||
mod schema_update;
|
||||
mod synonyms_addition;
|
||||
mod synonyms_deletion;
|
||||
|
||||
pub use self::documents_addition::{DocumentsAddition, apply_documents_addition};
|
||||
pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion};
|
||||
pub use self::schema_update::{apply_schema_update, push_schema_update};
|
||||
pub use self::synonyms_addition::{SynonymsAddition, apply_synonyms_addition};
|
||||
pub use self::synonyms_deletion::{SynonymsDeletion, apply_synonyms_deletion};
|
||||
|
||||
use std::time::{Duration, Instant};
|
||||
use std::collections::BTreeMap;
|
||||
use std::cmp;
|
||||
|
||||
use log::debug;
|
||||
use serde::{Serialize, Deserialize};
|
||||
|
||||
use crate::{store, MResult, DocumentId, RankedMap};
|
||||
use meilidb_schema::Schema;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum Update {
|
||||
SchemaUpdate(Schema),
|
||||
DocumentsAddition(Vec<rmpv::Value>),
|
||||
DocumentsDeletion(Vec<DocumentId>),
|
||||
SynonymsAddition(BTreeMap<String, Vec<String>>),
|
||||
SynonymsDeletion(BTreeMap<String, Option<Vec<String>>>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum UpdateType {
|
||||
SchemaUpdate { schema: Schema },
|
||||
DocumentsAddition { number: usize },
|
||||
DocumentsDeletion { number: usize },
|
||||
SynonymsAddition { number: usize },
|
||||
SynonymsDeletion { number: usize },
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct DetailedDuration {
|
||||
pub main: Duration,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct UpdateResult {
|
||||
pub update_id: u64,
|
||||
pub update_type: UpdateType,
|
||||
pub result: Result<(), String>,
|
||||
pub detailed_duration: DetailedDuration,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub enum UpdateStatus {
|
||||
Enqueued,
|
||||
Processed(UpdateResult),
|
||||
Unknown,
|
||||
}
|
||||
|
||||
pub fn update_status<T: rkv::Readable>(
|
||||
reader: &T,
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
update_id: u64,
|
||||
) -> MResult<UpdateStatus>
|
||||
{
|
||||
match updates_results_store.update_result(reader, update_id)? {
|
||||
Some(result) => Ok(UpdateStatus::Processed(result)),
|
||||
None => {
|
||||
if updates_store.contains(reader, update_id)? {
|
||||
Ok(UpdateStatus::Enqueued)
|
||||
} else {
|
||||
Ok(UpdateStatus::Unknown)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn next_update_id(
|
||||
writer: &mut rkv::Writer,
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
) -> MResult<u64>
|
||||
{
|
||||
let last_update_id = updates_store.last_update_id(writer)?;
|
||||
let last_update_id = last_update_id.map(|(n, _)| n);
|
||||
|
||||
let last_update_results_id = updates_results_store.last_update_id(writer)?;
|
||||
let last_update_results_id = last_update_results_id.map(|(n, _)| n);
|
||||
|
||||
let max_update_id = cmp::max(last_update_id, last_update_results_id);
|
||||
let new_update_id = max_update_id.map_or(0, |n| n + 1);
|
||||
|
||||
Ok(new_update_id)
|
||||
}
|
||||
|
||||
pub fn update_task(writer: &mut rkv::Writer, index: store::Index) -> MResult<Option<UpdateResult>> {
|
||||
let (update_id, update) = match index.updates.pop_front(writer)? {
|
||||
Some(value) => value,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
debug!("Processing update number {}", update_id);
|
||||
|
||||
let (update_type, result, duration) = match update {
|
||||
Update::SchemaUpdate(schema) => {
|
||||
let start = Instant::now();
|
||||
|
||||
let update_type = UpdateType::SchemaUpdate { schema: schema.clone() };
|
||||
let result = apply_schema_update(writer, index.main, &schema);
|
||||
|
||||
(update_type, result, start.elapsed())
|
||||
},
|
||||
Update::DocumentsAddition(documents) => {
|
||||
let start = Instant::now();
|
||||
|
||||
let ranked_map = match index.main.ranked_map(writer)? {
|
||||
Some(ranked_map) => ranked_map,
|
||||
None => RankedMap::default(),
|
||||
};
|
||||
|
||||
let update_type = UpdateType::DocumentsAddition { number: documents.len() };
|
||||
|
||||
let result = apply_documents_addition(
|
||||
writer,
|
||||
index.main,
|
||||
index.documents_fields,
|
||||
index.postings_lists,
|
||||
index.docs_words,
|
||||
ranked_map,
|
||||
documents,
|
||||
);
|
||||
|
||||
(update_type, result, start.elapsed())
|
||||
},
|
||||
Update::DocumentsDeletion(documents) => {
|
||||
let start = Instant::now();
|
||||
|
||||
let ranked_map = match index.main.ranked_map(writer)? {
|
||||
Some(ranked_map) => ranked_map,
|
||||
None => RankedMap::default(),
|
||||
};
|
||||
|
||||
let update_type = UpdateType::DocumentsDeletion { number: documents.len() };
|
||||
|
||||
let result = apply_documents_deletion(
|
||||
writer,
|
||||
index.main,
|
||||
index.documents_fields,
|
||||
index.postings_lists,
|
||||
index.docs_words,
|
||||
ranked_map,
|
||||
documents,
|
||||
);
|
||||
|
||||
(update_type, result, start.elapsed())
|
||||
},
|
||||
Update::SynonymsAddition(synonyms) => {
|
||||
let start = Instant::now();
|
||||
|
||||
let update_type = UpdateType::SynonymsAddition { number: synonyms.len() };
|
||||
|
||||
let result = apply_synonyms_addition(
|
||||
writer,
|
||||
index.main,
|
||||
index.synonyms,
|
||||
synonyms,
|
||||
);
|
||||
|
||||
(update_type, result, start.elapsed())
|
||||
},
|
||||
Update::SynonymsDeletion(synonyms) => {
|
||||
let start = Instant::now();
|
||||
|
||||
let update_type = UpdateType::SynonymsDeletion { number: synonyms.len() };
|
||||
|
||||
let result = apply_synonyms_deletion(
|
||||
writer,
|
||||
index.main,
|
||||
index.synonyms,
|
||||
synonyms,
|
||||
);
|
||||
|
||||
(update_type, result, start.elapsed())
|
||||
},
|
||||
};
|
||||
|
||||
debug!("Processed update number {} {:?} {:?}", update_id, update_type, result);
|
||||
|
||||
let detailed_duration = DetailedDuration { main: duration };
|
||||
let status = UpdateResult {
|
||||
update_id,
|
||||
update_type,
|
||||
result: result.map_err(|e| e.to_string()),
|
||||
detailed_duration,
|
||||
};
|
||||
|
||||
index.updates_results.put_update_result(writer, update_id, &status)?;
|
||||
|
||||
Ok(Some(status))
|
||||
}
|
31
meilidb-core/src/update/schema_update.rs
Normal file
31
meilidb-core/src/update/schema_update.rs
Normal file
|
@ -0,0 +1,31 @@
|
|||
use meilidb_schema::Schema;
|
||||
use crate::{store, error::UnsupportedOperation, MResult};
|
||||
use crate::update::{Update, next_update_id};
|
||||
|
||||
pub fn apply_schema_update(
|
||||
writer: &mut rkv::Writer,
|
||||
main_store: store::Main,
|
||||
new_schema: &Schema,
|
||||
) -> MResult<()>
|
||||
{
|
||||
if let Some(_) = main_store.schema(writer)? {
|
||||
return Err(UnsupportedOperation::SchemaAlreadyExists.into())
|
||||
}
|
||||
|
||||
main_store.put_schema(writer, new_schema)
|
||||
}
|
||||
|
||||
pub fn push_schema_update(
|
||||
writer: &mut rkv::Writer,
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
schema: Schema,
|
||||
) -> MResult<u64>
|
||||
{
|
||||
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
||||
|
||||
let update = Update::SchemaUpdate(schema);
|
||||
updates_store.put_update(writer, last_update_id, &update)?;
|
||||
|
||||
Ok(last_update_id)
|
||||
}
|
119
meilidb-core/src/update/synonyms_addition.rs
Normal file
119
meilidb-core/src/update/synonyms_addition.rs
Normal file
|
@ -0,0 +1,119 @@
|
|||
use std::collections::BTreeMap;
|
||||
|
||||
use fst::{SetBuilder, set::OpBuilder};
|
||||
use sdset::SetBuf;
|
||||
|
||||
use crate::automaton::normalize_str;
|
||||
use crate::update::{Update, next_update_id};
|
||||
use crate::{store, MResult};
|
||||
|
||||
pub struct SynonymsAddition {
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
updates_notifier: crossbeam_channel::Sender<()>,
|
||||
synonyms: BTreeMap<String, Vec<String>>,
|
||||
}
|
||||
|
||||
impl SynonymsAddition {
|
||||
pub fn new(
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
updates_notifier: crossbeam_channel::Sender<()>,
|
||||
) -> SynonymsAddition
|
||||
{
|
||||
SynonymsAddition {
|
||||
updates_store,
|
||||
updates_results_store,
|
||||
updates_notifier,
|
||||
synonyms: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_synonym<S, T, I>(&mut self, synonym: S, alternatives: I)
|
||||
where S: AsRef<str>,
|
||||
T: AsRef<str>,
|
||||
I: IntoIterator<Item=T>,
|
||||
{
|
||||
let synonym = normalize_str(synonym.as_ref());
|
||||
let alternatives = alternatives.into_iter().map(|s| s.as_ref().to_lowercase());
|
||||
self.synonyms.entry(synonym).or_insert_with(Vec::new).extend(alternatives);
|
||||
}
|
||||
|
||||
pub fn finalize(self, mut writer: rkv::Writer) -> MResult<u64> {
|
||||
let update_id = push_synonyms_addition(
|
||||
&mut writer,
|
||||
self.updates_store,
|
||||
self.updates_results_store,
|
||||
self.synonyms,
|
||||
)?;
|
||||
writer.commit()?;
|
||||
let _ = self.updates_notifier.send(());
|
||||
|
||||
Ok(update_id)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_synonyms_addition(
|
||||
writer: &mut rkv::Writer,
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
addition: BTreeMap<String, Vec<String>>,
|
||||
) -> MResult<u64>
|
||||
{
|
||||
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
||||
|
||||
let update = Update::SynonymsAddition(addition);
|
||||
updates_store.put_update(writer, last_update_id, &update)?;
|
||||
|
||||
Ok(last_update_id)
|
||||
}
|
||||
|
||||
pub fn apply_synonyms_addition(
|
||||
writer: &mut rkv::Writer,
|
||||
main_store: store::Main,
|
||||
synonyms_store: store::Synonyms,
|
||||
addition: BTreeMap<String, Vec<String>>,
|
||||
) -> MResult<()>
|
||||
{
|
||||
let mut synonyms_builder = SetBuilder::memory();
|
||||
|
||||
for (word, alternatives) in addition {
|
||||
synonyms_builder.insert(&word).unwrap();
|
||||
|
||||
let alternatives = {
|
||||
let alternatives = SetBuf::from_dirty(alternatives);
|
||||
let mut alternatives_builder = SetBuilder::memory();
|
||||
alternatives_builder.extend_iter(alternatives).unwrap();
|
||||
let bytes = alternatives_builder.into_inner().unwrap();
|
||||
fst::Set::from_bytes(bytes).unwrap()
|
||||
};
|
||||
|
||||
synonyms_store.put_synonyms(writer, word.as_bytes(), &alternatives)?;
|
||||
}
|
||||
|
||||
let delta_synonyms = synonyms_builder
|
||||
.into_inner()
|
||||
.and_then(fst::Set::from_bytes)
|
||||
.unwrap();
|
||||
|
||||
let synonyms = match main_store.synonyms_fst(writer)? {
|
||||
Some(synonyms) => {
|
||||
let op = OpBuilder::new()
|
||||
.add(synonyms.stream())
|
||||
.add(delta_synonyms.stream())
|
||||
.r#union();
|
||||
|
||||
let mut synonyms_builder = SetBuilder::memory();
|
||||
synonyms_builder.extend_stream(op).unwrap();
|
||||
synonyms_builder
|
||||
.into_inner()
|
||||
.and_then(fst::Set::from_bytes)
|
||||
.unwrap()
|
||||
},
|
||||
None => delta_synonyms,
|
||||
};
|
||||
|
||||
main_store.put_synonyms_fst(writer, &synonyms)?;
|
||||
|
||||
Ok(())
|
||||
}
|
162
meilidb-core/src/update/synonyms_deletion.rs
Normal file
162
meilidb-core/src/update/synonyms_deletion.rs
Normal file
|
@ -0,0 +1,162 @@
|
|||
use std::collections::BTreeMap;
|
||||
use std::iter::FromIterator;
|
||||
|
||||
use fst::{SetBuilder, set::OpBuilder};
|
||||
use sdset::SetBuf;
|
||||
|
||||
use crate::automaton::normalize_str;
|
||||
use crate::update::{Update, next_update_id};
|
||||
use crate::{store, MResult};
|
||||
|
||||
pub struct SynonymsDeletion {
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
updates_notifier: crossbeam_channel::Sender<()>,
|
||||
synonyms: BTreeMap<String, Option<Vec<String>>>,
|
||||
}
|
||||
|
||||
impl SynonymsDeletion {
|
||||
pub fn new(
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
updates_notifier: crossbeam_channel::Sender<()>,
|
||||
) -> SynonymsDeletion
|
||||
{
|
||||
SynonymsDeletion {
|
||||
updates_store,
|
||||
updates_results_store,
|
||||
updates_notifier,
|
||||
synonyms: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn delete_all_alternatives_of<S: AsRef<str>>(&mut self, synonym: S) {
|
||||
let synonym = normalize_str(synonym.as_ref());
|
||||
self.synonyms.insert(synonym, None);
|
||||
}
|
||||
|
||||
pub fn delete_specific_alternatives_of<S, T, I>(&mut self, synonym: S, alternatives: I)
|
||||
where S: AsRef<str>,
|
||||
T: AsRef<str>,
|
||||
I: Iterator<Item=T>,
|
||||
{
|
||||
let synonym = normalize_str(synonym.as_ref());
|
||||
let value = self.synonyms.entry(synonym).or_insert(None);
|
||||
let alternatives = alternatives.map(|s| s.as_ref().to_lowercase());
|
||||
match value {
|
||||
Some(v) => v.extend(alternatives),
|
||||
None => *value = Some(Vec::from_iter(alternatives)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn finalize(self, mut writer: rkv::Writer) -> MResult<u64> {
|
||||
let update_id = push_synonyms_deletion(
|
||||
&mut writer,
|
||||
self.updates_store,
|
||||
self.updates_results_store,
|
||||
self.synonyms,
|
||||
)?;
|
||||
writer.commit()?;
|
||||
let _ = self.updates_notifier.send(());
|
||||
|
||||
Ok(update_id)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_synonyms_deletion(
|
||||
writer: &mut rkv::Writer,
|
||||
updates_store: store::Updates,
|
||||
updates_results_store: store::UpdatesResults,
|
||||
deletion: BTreeMap<String, Option<Vec<String>>>,
|
||||
) -> MResult<u64>
|
||||
{
|
||||
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
|
||||
|
||||
let update = Update::SynonymsDeletion(deletion);
|
||||
updates_store.put_update(writer, last_update_id, &update)?;
|
||||
|
||||
Ok(last_update_id)
|
||||
}
|
||||
|
||||
pub fn apply_synonyms_deletion(
|
||||
writer: &mut rkv::Writer,
|
||||
main_store: store::Main,
|
||||
synonyms_store: store::Synonyms,
|
||||
deletion: BTreeMap<String, Option<Vec<String>>>,
|
||||
) -> MResult<()>
|
||||
{
|
||||
let mut delete_whole_synonym_builder = SetBuilder::memory();
|
||||
|
||||
for (synonym, alternatives) in deletion {
|
||||
match alternatives {
|
||||
Some(alternatives) => {
|
||||
let prev_alternatives = synonyms_store.synonyms(writer, synonym.as_bytes())?;
|
||||
let prev_alternatives = match prev_alternatives {
|
||||
Some(alternatives) => alternatives,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let delta_alternatives = {
|
||||
let alternatives = SetBuf::from_dirty(alternatives);
|
||||
let mut builder = SetBuilder::memory();
|
||||
builder.extend_iter(alternatives).unwrap();
|
||||
builder.into_inner()
|
||||
.and_then(fst::Set::from_bytes)
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
let op = OpBuilder::new()
|
||||
.add(prev_alternatives.stream())
|
||||
.add(delta_alternatives.stream())
|
||||
.difference();
|
||||
|
||||
let (alternatives, empty_alternatives) = {
|
||||
let mut builder = SetBuilder::memory();
|
||||
let len = builder.get_ref().len();
|
||||
builder.extend_stream(op).unwrap();
|
||||
let is_empty = len == builder.get_ref().len();
|
||||
let bytes = builder.into_inner().unwrap();
|
||||
let alternatives = fst::Set::from_bytes(bytes).unwrap();
|
||||
|
||||
(alternatives, is_empty)
|
||||
};
|
||||
|
||||
if empty_alternatives {
|
||||
delete_whole_synonym_builder.insert(synonym.as_bytes())?;
|
||||
} else {
|
||||
synonyms_store.put_synonyms(writer, synonym.as_bytes(), &alternatives)?;
|
||||
}
|
||||
},
|
||||
None => {
|
||||
delete_whole_synonym_builder.insert(&synonym).unwrap();
|
||||
synonyms_store.del_synonyms(writer, synonym.as_bytes())?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let delta_synonyms = delete_whole_synonym_builder
|
||||
.into_inner()
|
||||
.and_then(fst::Set::from_bytes)
|
||||
.unwrap();
|
||||
|
||||
let synonyms = match main_store.synonyms_fst(writer)? {
|
||||
Some(synonyms) => {
|
||||
let op = OpBuilder::new()
|
||||
.add(synonyms.stream())
|
||||
.add(delta_synonyms.stream())
|
||||
.difference();
|
||||
|
||||
let mut synonyms_builder = SetBuilder::memory();
|
||||
synonyms_builder.extend_stream(op).unwrap();
|
||||
synonyms_builder
|
||||
.into_inner()
|
||||
.and_then(fst::Set::from_bytes)
|
||||
.unwrap()
|
||||
},
|
||||
None => fst::Set::default(),
|
||||
};
|
||||
|
||||
main_store.put_synonyms_fst(writer, &synonyms)?;
|
||||
|
||||
Ok(())
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue