Merge pull request #183 from meilisearch/number-of-documents

Compute the number of documents on updates
This commit is contained in:
Clément Renault 2019-09-14 16:32:18 +02:00 committed by GitHub
commit aaeb25828f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 67 additions and 24 deletions

View File

@ -35,14 +35,16 @@ impl DocumentsIndex {
Ok(())
}
pub fn del_all_document_fields(&self, id: DocumentId) -> RocksDbResult<()> {
pub fn del_all_document_fields(&self, id: DocumentId) -> RocksDbResult<usize> {
let (start, end) = document_fields_range(id);
let mut count = 0;
for (key, _) in self.0.range(start, end)? {
self.0.remove(key)?;
count += 1;
}
Ok(())
Ok(count)
}
pub fn document_fields(&self, id: DocumentId) -> RocksDbResult<DocumentFieldsIter> {
@ -52,7 +54,7 @@ impl DocumentsIndex {
Ok(DocumentFieldsIter(iter))
}
pub fn len(&self) -> RocksDbResult<usize> {
pub fn len(&self) -> RocksDbResult<u64> {
let mut last_document_id = None;
let mut count = 0;

View File

@ -1,4 +1,5 @@
use std::sync::Arc;
use std::convert::TryInto;
use meilidb_schema::Schema;
@ -9,6 +10,7 @@ const SCHEMA_KEY: &str = "schema";
const WORDS_KEY: &str = "words";
const SYNONYMS_KEY: &str = "synonyms";
const RANKED_MAP_KEY: &str = "ranked-map";
const NUMBER_OF_DOCUMENTS_KEY: &str = "number-of-documents";
#[derive(Clone)]
pub struct MainIndex(pub(crate) crate::CfTree);
@ -79,4 +81,22 @@ impl MainIndex {
self.0.insert(RANKED_MAP_KEY, bytes)?;
Ok(())
}
pub fn number_of_documents(&self) -> Result<u64, Error> {
match self.0.get(NUMBER_OF_DOCUMENTS_KEY)? {
Some(bytes) => {
let array = (*bytes).try_into().unwrap();
Ok(u64::from_be_bytes(array))
},
None => Ok(0),
}
}
pub fn set_number_of_documents<F>(&self, f: F) -> Result<u64, Error>
where F: FnOnce(u64) -> u64,
{
let new = self.number_of_documents().map(f)?;
self.0.insert(NUMBER_OF_DOCUMENTS_KEY, new.to_be_bytes())?;
Ok(new)
}
}

View File

@ -164,7 +164,7 @@ fn last_update_id(
#[derive(Copy, Clone)]
pub struct IndexStats {
pub number_of_words: usize,
pub number_of_documents: usize,
pub number_of_documents: u64,
pub number_attrs_in_ranked_map: usize,
}
@ -192,6 +192,7 @@ pub(crate) struct Cache {
pub synonyms: Arc<fst::Set>,
pub schema: Schema,
pub ranked_map: RankedMap,
pub number_of_documents: u64,
}
impl Index {
@ -241,7 +242,9 @@ impl Index {
None => RankedMap::default(),
};
let cache = Cache { words, synonyms, schema, ranked_map };
let number_of_documents = documents_index.len()?;
let cache = Cache { words, synonyms, schema, ranked_map, number_of_documents };
let cache = Arc::new(ArcSwap::from_pointee(cache));
let last_update_id = last_update_id(&updates_index, &updates_results_index)?;
@ -280,7 +283,7 @@ impl Index {
let cache = self.cache.load();
Ok(IndexStats {
number_of_words: cache.words.len(),
number_of_documents: self.documents_index.len()?,
number_of_documents: cache.number_of_documents,
number_attrs_in_ranked_map: cache.ranked_map.len(),
})
}
@ -319,6 +322,10 @@ impl Index {
self.custom_settings_index.clone()
}
pub fn number_of_documents(&self) -> u64 {
self.cache.load().number_of_documents
}
pub fn documents_addition<D>(&self) -> DocumentsAddition<D> {
DocumentsAddition::new(self)
}

View File

@ -5,11 +5,10 @@ use fst::{SetBuilder, set::OpBuilder};
use sdset::{SetOperation, duo::Union};
use serde::Serialize;
use crate::RankedMap;
use crate::database::{Error, Index, index::Cache, apply_documents_deletion};
use crate::indexer::Indexer;
use crate::serde::{extract_document_id, Serializer, RamDocumentStore};
use crate::RankedMap;
use crate::database::{Error, Index, index::Cache, apply_documents_deletion};
pub struct DocumentsAddition<'a, D> {
index: &'a Index,
@ -73,8 +72,8 @@ pub fn apply_documents_addition(
let words = ref_index.words_index;
// 1. remove the previous documents match indexes
let document_ids = document_ids.into_iter().collect();
apply_documents_deletion(index, ranked_map.clone(), document_ids)?;
let documents_to_insert = document_ids.iter().cloned().collect();
apply_documents_deletion(index, ranked_map.clone(), documents_to_insert)?;
// 2. insert new document attributes in the database
for ((id, attr), value) in document_store.into_inner() {
@ -124,13 +123,16 @@ pub fn apply_documents_addition(
main.set_words_set(&words)?;
main.set_ranked_map(&ranked_map)?;
let inserted_documents_len = document_ids.len() as u64;
let number_of_documents = main.set_number_of_documents(|old| old + inserted_documents_len)?;
// update the "consistent" view of the Index
let cache = ref_index.cache;
let words = Arc::new(words);
let synonyms = cache.synonyms.clone();
let schema = cache.schema.clone();
let cache = Cache { words, synonyms, schema, ranked_map };
let cache = Cache { words, synonyms, schema, ranked_map, number_of_documents };
index.cache.store(Arc::new(cache));
Ok(())

View File

@ -1,4 +1,4 @@
use std::collections::{HashMap, BTreeSet};
use std::collections::{HashMap, HashSet, BTreeSet};
use std::sync::Arc;
use fst::{SetBuilder, Streamer};
@ -88,6 +88,7 @@ pub fn apply_documents_deletion(
}
}
let mut deleted_documents = HashSet::new();
let mut removed_words = BTreeSet::new();
for (word, document_ids) in words_document_ids {
let document_ids = SetBuf::from_dirty(document_ids);
@ -105,7 +106,9 @@ pub fn apply_documents_deletion(
}
for id in document_ids {
documents.del_all_document_fields(id)?;
if documents.del_all_document_fields(id)? != 0 {
deleted_documents.insert(id);
}
docs_words.del_doc_words(id)?;
}
}
@ -131,13 +134,16 @@ pub fn apply_documents_deletion(
main.set_words_set(&words)?;
main.set_ranked_map(&ranked_map)?;
let deleted_documents_len = deleted_documents.len() as u64;
let number_of_documents = main.set_number_of_documents(|old| old - deleted_documents_len)?;
// update the "consistent" view of the Index
let cache = ref_index.cache;
let words = Arc::new(words);
let synonyms = cache.synonyms.clone();
let schema = cache.schema.clone();
let cache = Cache { words, synonyms, schema, ranked_map };
let cache = Cache { words, synonyms, schema, ranked_map, number_of_documents };
index.cache.store(Arc::new(cache));
Ok(())

View File

@ -85,8 +85,9 @@ pub fn apply_synonyms_addition(
let ranked_map = cache.ranked_map.clone();
let synonyms = Arc::new(synonyms);
let schema = cache.schema.clone();
let number_of_documents = cache.number_of_documents;
let cache = Cache { words, synonyms, schema, ranked_map };
let cache = Cache { words, synonyms, schema, ranked_map, number_of_documents };
index.cache.store(Arc::new(cache));
Ok(())

View File

@ -128,8 +128,9 @@ pub fn apply_synonyms_deletion(
let ranked_map = cache.ranked_map.clone();
let synonyms = Arc::new(synonyms);
let schema = cache.schema.clone();
let number_of_documents = cache.number_of_documents;
let cache = Cache { words, synonyms, schema, ranked_map };
let cache = Cache { words, synonyms, schema, ranked_map, number_of_documents };
index.cache.store(Arc::new(cache));
Ok(())

View File

@ -33,6 +33,7 @@ fn insert_delete_document() {
let status = index.update_status_blocking(update_id).unwrap();
assert!(as_been_updated.swap(false, Relaxed));
assert!(status.result.is_ok());
assert_eq!(index.number_of_documents(), 1);
let docs = index.query_builder().query("hello", 0..10).unwrap();
assert_eq!(docs.len(), 1);
@ -44,6 +45,7 @@ fn insert_delete_document() {
let status = index.update_status_blocking(update_id).unwrap();
assert!(as_been_updated.swap(false, Relaxed));
assert!(status.result.is_ok());
assert_eq!(index.number_of_documents(), 0);
let docs = index.query_builder().query("hello", 0..10).unwrap();
assert_eq!(docs.len(), 0);
@ -71,17 +73,19 @@ fn replace_document() {
let status = index.update_status_blocking(update_id).unwrap();
assert!(as_been_updated.swap(false, Relaxed));
assert!(status.result.is_ok());
assert_eq!(index.number_of_documents(), 1);
let docs = index.query_builder().query("hello", 0..10).unwrap();
assert_eq!(docs.len(), 1);
assert_eq!(index.document(None, docs[0].id).unwrap().as_ref(), Some(&doc1));
let mut deletion = index.documents_addition();
deletion.update_document(&doc2);
let update_id = deletion.finalize().unwrap();
let mut addition = index.documents_addition();
addition.update_document(&doc2);
let update_id = addition.finalize().unwrap();
let status = index.update_status_blocking(update_id).unwrap();
assert!(as_been_updated.swap(false, Relaxed));
assert!(status.result.is_ok());
assert_eq!(index.number_of_documents(), 1);
let docs = index.query_builder().query("hello", 0..10).unwrap();
assert_eq!(docs.len(), 0);