Make the project be a workspace

This commit is contained in:
Clément Renault 2019-10-04 10:26:32 +02:00
parent 3476939b7e
commit 62a0aefe44
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
40 changed files with 49 additions and 42 deletions

View file

@ -0,0 +1,138 @@
use std::collections::HashSet;
use fst::{SetBuilder, set::OpBuilder};
use meilidb_schema::Schema;
use sdset::{SetOperation, duo::Union};
use serde::Serialize;
use crate::raw_indexer::RawIndexer;
use crate::serde::{extract_document_id, Serializer, RamDocumentStore};
use crate::store;
use crate::update::{push_documents_addition, apply_documents_deletion};
use crate::{Error, RankedMap};
pub struct DocumentsAddition<D> {
updates_store: store::Updates,
documents: Vec<D>,
}
impl<D> DocumentsAddition<D> {
pub fn new(updates_store: store::Updates) -> DocumentsAddition<D> {
DocumentsAddition { updates_store, documents: Vec::new() }
}
pub fn update_document(&mut self, document: D) {
self.documents.push(document);
}
pub fn finalize(self, writer: &mut rkv::Writer) -> Result<u64, Error>
where D: serde::Serialize
{
push_documents_addition(writer, self.updates_store, self.documents)
}
}
pub fn apply_documents_addition(
writer: &mut rkv::Writer,
main_store: store::Main,
documents_fields_store: store::DocumentsFields,
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
schema: &Schema,
mut ranked_map: RankedMap,
addition: Vec<rmpv::Value>,
) -> Result<(), Error>
{
let mut document_ids = HashSet::new();
let mut document_store = RamDocumentStore::new();
let mut indexer = RawIndexer::new();
let identifier = schema.identifier_name();
for document in addition {
let document_id = match extract_document_id(identifier, &document)? {
Some(id) => id,
None => return Err(Error::MissingDocumentId),
};
// 1. store the document id for future deletion
document_ids.insert(document_id);
// 2. index the document fields in ram stores
let serializer = Serializer {
schema,
document_store: &mut document_store,
indexer: &mut indexer,
ranked_map: &mut ranked_map,
document_id,
};
document.serialize(serializer)?;
}
// 1. remove the previous documents match indexes
let documents_to_insert = document_ids.iter().cloned().collect();
apply_documents_deletion(
writer,
main_store,
documents_fields_store,
postings_lists_store,
docs_words_store,
schema,
ranked_map.clone(),
documents_to_insert,
)?;
// 2. insert new document attributes in the database
for ((id, attr), value) in document_store.into_inner() {
documents_fields_store.put_document_field(writer, id, attr, &value)?;
}
let indexed = indexer.build();
let mut delta_words_builder = SetBuilder::memory();
for (word, delta_set) in indexed.words_doc_indexes {
delta_words_builder.insert(&word).unwrap();
let set = match postings_lists_store.postings_list(writer, &word)? {
Some(set) => Union::new(&set, &delta_set).into_set_buf(),
None => delta_set,
};
postings_lists_store.put_postings_list(writer, &word, &set)?;
}
for (id, words) in indexed.docs_words {
docs_words_store.put_doc_words(writer, id, &words)?;
}
let delta_words = delta_words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
let words = match main_store.words_fst(writer)? {
Some(words) => {
let op = OpBuilder::new()
.add(words.stream())
.add(delta_words.stream())
.r#union();
let mut words_builder = SetBuilder::memory();
words_builder.extend_stream(op).unwrap();
words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap()
},
None => delta_words,
};
main_store.put_words_fst(writer, &words)?;
main_store.put_ranked_map(writer, &ranked_map)?;
let inserted_documents_len = document_ids.len() as u64;
main_store.put_number_of_documents(writer, |old| old + inserted_documents_len)?;
Ok(())
}

View file

@ -0,0 +1,137 @@
use std::collections::{HashMap, HashSet, BTreeSet};
use fst::{SetBuilder, Streamer};
use meilidb_schema::Schema;
use sdset::{SetBuf, SetOperation, duo::DifferenceByKey};
use crate::{DocumentId, RankedMap, Error};
use crate::serde::extract_document_id;
use crate::update::push_documents_deletion;
use crate::store;
pub struct DocumentsDeletion {
updates_store: store::Updates,
documents: Vec<DocumentId>,
}
impl DocumentsDeletion {
pub fn new(updates_store: store::Updates) -> DocumentsDeletion {
DocumentsDeletion { updates_store, documents: Vec::new() }
}
pub fn delete_document_by_id(&mut self, document_id: DocumentId) {
self.documents.push(document_id);
}
pub fn delete_document<D>(&mut self, schema: &Schema, document: D) -> Result<(), Error>
where D: serde::Serialize,
{
let identifier = schema.identifier_name();
let document_id = match extract_document_id(identifier, &document)? {
Some(id) => id,
None => return Err(Error::MissingDocumentId),
};
self.delete_document_by_id(document_id);
Ok(())
}
pub fn finalize(self, writer: &mut rkv::Writer) -> Result<u64, Error> {
push_documents_deletion(writer, self.updates_store, self.documents)
}
}
impl Extend<DocumentId> for DocumentsDeletion {
fn extend<T: IntoIterator<Item=DocumentId>>(&mut self, iter: T) {
self.documents.extend(iter)
}
}
pub fn apply_documents_deletion(
writer: &mut rkv::Writer,
main_store: store::Main,
documents_fields_store: store::DocumentsFields,
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
schema: &Schema,
mut ranked_map: RankedMap,
deletion: Vec<DocumentId>,
) -> Result<(), Error>
{
let idset = SetBuf::from_dirty(deletion);
// collect the ranked attributes according to the schema
let ranked_attrs: Vec<_> = schema.iter()
.filter_map(|(_, attr, prop)| {
if prop.is_ranked() { Some(attr) } else { None }
})
.collect();
let mut words_document_ids = HashMap::new();
for id in idset {
// remove all the ranked attributes from the ranked_map
for ranked_attr in &ranked_attrs {
ranked_map.remove(id, *ranked_attr);
}
if let Some(words) = docs_words_store.doc_words(writer, id)? {
let mut stream = words.stream();
while let Some(word) = stream.next() {
let word = word.to_vec();
words_document_ids.entry(word).or_insert_with(Vec::new).push(id);
}
}
}
let mut deleted_documents = HashSet::new();
let mut removed_words = BTreeSet::new();
for (word, document_ids) in words_document_ids {
let document_ids = SetBuf::from_dirty(document_ids);
if let Some(doc_indexes) = postings_lists_store.postings_list(writer, &word)? {
let op = DifferenceByKey::new(&doc_indexes, &document_ids, |d| d.document_id, |id| *id);
let doc_indexes = op.into_set_buf();
if !doc_indexes.is_empty() {
postings_lists_store.put_postings_list(writer, &word, &doc_indexes)?;
} else {
postings_lists_store.del_postings_list(writer, &word)?;
removed_words.insert(word);
}
}
for id in document_ids {
if documents_fields_store.del_all_document_fields(writer, id)? != 0 {
deleted_documents.insert(id);
}
docs_words_store.del_doc_words(writer, id)?;
}
}
let removed_words = fst::Set::from_iter(removed_words).unwrap();
let words = match main_store.words_fst(writer)? {
Some(words_set) => {
let op = fst::set::OpBuilder::new()
.add(words_set.stream())
.add(removed_words.stream())
.difference();
let mut words_builder = SetBuilder::memory();
words_builder.extend_stream(op).unwrap();
words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap()
},
None => fst::Set::default(),
};
main_store.put_words_fst(writer, &words)?;
main_store.put_ranked_map(writer, &ranked_map)?;
let deleted_documents_len = deleted_documents.len() as u64;
main_store.put_number_of_documents(writer, |old| old - deleted_documents_len)?;
Ok(())
}

View file

@ -0,0 +1,112 @@
mod documents_addition;
mod documents_deletion;
pub use self::documents_addition::{DocumentsAddition, apply_documents_addition};
pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion};
use std::time::Duration;
use std::collections::BTreeMap;
use serde::{Serialize, Deserialize};
use crate::{store, Error, MResult, DocumentId};
#[derive(Serialize, Deserialize)]
pub enum Update {
DocumentsAddition(Vec<rmpv::Value>),
DocumentsDeletion(Vec<DocumentId>),
SynonymsAddition(BTreeMap<String, Vec<String>>),
SynonymsDeletion(BTreeMap<String, Option<Vec<String>>>),
}
#[derive(Clone, Serialize, Deserialize)]
pub enum UpdateType {
DocumentsAddition { number: usize },
DocumentsDeletion { number: usize },
SynonymsAddition { number: usize },
SynonymsDeletion { number: usize },
}
#[derive(Clone, Serialize, Deserialize)]
pub struct DetailedDuration {
pub main: Duration,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct UpdateResult {
pub update_id: u64,
pub update_type: UpdateType,
pub result: Result<(), String>,
pub detailed_duration: DetailedDuration,
}
#[derive(Clone, Serialize, Deserialize)]
pub enum UpdateStatus {
Enqueued,
Processed(UpdateResult),
Unknown,
}
pub fn update_status<T: rkv::Readable>(
reader: &T,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
update_id: u64,
) -> MResult<UpdateStatus>
{
match updates_results_store.update_result(reader, update_id)? {
Some(result) => Ok(UpdateStatus::Processed(result)),
None => {
if updates_store.contains(reader, update_id)? {
Ok(UpdateStatus::Enqueued)
} else {
Ok(UpdateStatus::Unknown)
}
}
}
}
pub fn push_documents_addition<D: serde::Serialize>(
writer: &mut rkv::Writer,
updates_store: store::Updates,
addition: Vec<D>,
) -> Result<u64, Error>
{
let mut values = Vec::with_capacity(addition.len());
for add in addition {
let vec = rmp_serde::to_vec_named(&add)?;
let add = rmp_serde::from_read(&vec[..])?;
values.push(add);
}
let update = Update::DocumentsAddition(values);
Ok(updates_store.push_back(writer, &update)?)
}
pub fn push_documents_deletion(
writer: &mut rkv::Writer,
updates_store: store::Updates,
deletion: Vec<DocumentId>,
) -> Result<u64, Error>
{
let update = Update::DocumentsDeletion(deletion);
Ok(updates_store.push_back(writer, &update)?)
}
pub fn push_synonyms_addition(
writer: &mut rkv::Writer,
updates_store: store::Updates,
addition: BTreeMap<String, Vec<String>>,
) -> Result<u64, Error>
{
let update = Update::SynonymsAddition(addition);
Ok(updates_store.push_back(writer, &update)?)
}
pub fn push_synonyms_deletion(
writer: &mut rkv::Writer,
updates_store: store::Updates,
deletion: BTreeMap<String, Option<Vec<String>>>,
) -> Result<u64, Error>
{
let update = Update::SynonymsDeletion(deletion);
Ok(updates_store.push_back(writer, &update)?)
}