mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-23 19:57:30 +01:00
Merge pull request #136 from shekhirin/index-fields-ids-distribution-cache
feat(index): store fields distribution in index
This commit is contained in:
commit
ee3f93c029
@ -19,6 +19,11 @@ impl<'a> ExternalDocumentsIds<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `true` if hard and soft external documents lists are empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.hard.is_empty() && self.soft.is_empty()
|
||||
}
|
||||
|
||||
pub fn get<A: AsRef<[u8]>>(&self, external_id: A) -> Option<u32> {
|
||||
let external_id = external_id.as_ref();
|
||||
match self.soft.get(external_id).or_else(|| self.hard.get(external_id)) {
|
||||
|
@ -10,7 +10,7 @@ use chrono::{Utc, DateTime};
|
||||
|
||||
use crate::facet::FacetType;
|
||||
use crate::fields_ids_map::FieldsIdsMap;
|
||||
use crate::{default_criteria, Criterion, Search, FacetDistribution};
|
||||
use crate::{default_criteria, Criterion, Search, FacetDistribution, FieldsDistribution};
|
||||
use crate::{BEU32, DocumentId, FieldId, ExternalDocumentsIds};
|
||||
use crate::{
|
||||
RoaringBitmapCodec, RoaringBitmapLenCodec, BEU32StrCodec,
|
||||
@ -23,6 +23,7 @@ pub const DOCUMENTS_IDS_KEY: &str = "documents-ids";
|
||||
pub const FACETED_DOCUMENTS_IDS_PREFIX: &str = "faceted-documents-ids";
|
||||
pub const FACETED_FIELDS_KEY: &str = "faceted-fields";
|
||||
pub const FIELDS_IDS_MAP_KEY: &str = "fields-ids-map";
|
||||
pub const FIELDS_DISTRIBUTION_KEY: &str = "fields-distribution";
|
||||
pub const PRIMARY_KEY_KEY: &str = "primary-key";
|
||||
pub const SEARCHABLE_FIELDS_KEY: &str = "searchable-fields";
|
||||
pub const HARD_EXTERNAL_DOCUMENTS_IDS_KEY: &str = "hard-external-documents-ids";
|
||||
@ -204,23 +205,18 @@ impl Index {
|
||||
Ok(self.main.get::<_, Str, SerdeJson<FieldsIdsMap>>(rtxn, FIELDS_IDS_MAP_KEY)?.unwrap_or_default())
|
||||
}
|
||||
|
||||
/* fields ids distribution */
|
||||
/* fields distribution */
|
||||
|
||||
/// Returns the fields ids distribution which associate the internal field ids
|
||||
/// with the number of times it occurs in the obkv documents.
|
||||
// TODO store in the index itself and change only within updates that modify the documents
|
||||
pub fn fields_ids_distribution(&self, rtxn: &RoTxn) -> anyhow::Result<HashMap<FieldId, u64>> {
|
||||
let mut distribution = HashMap::new();
|
||||
/// Writes the fields distribution which associates every field name with
|
||||
/// the number of times it occurs in the documents.
|
||||
pub fn put_fields_distribution(&self, wtxn: &mut RwTxn, distribution: &FieldsDistribution) -> heed::Result<()> {
|
||||
self.main.put::<_, Str, SerdeJson<FieldsDistribution>>(wtxn, FIELDS_DISTRIBUTION_KEY, distribution)
|
||||
}
|
||||
|
||||
for document in self.documents.iter(rtxn)? {
|
||||
let (_, obkv) = document?;
|
||||
|
||||
for (field_id, _) in obkv.iter() {
|
||||
*distribution.entry(field_id).or_default() += 1;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(distribution)
|
||||
/// Returns the fields distribution which associates every field name with
|
||||
/// the number of times it occurs in the documents.
|
||||
pub fn fields_distribution(&self, rtxn: &RoTxn) -> heed::Result<FieldsDistribution> {
|
||||
Ok(self.main.get::<_, Str, SerdeJson<FieldsDistribution>>(rtxn, FIELDS_DISTRIBUTION_KEY)?.unwrap_or_default())
|
||||
}
|
||||
|
||||
/* displayed fields */
|
||||
@ -469,40 +465,34 @@ impl Index {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use heed::EnvOpenOptions;
|
||||
use maplit::hashmap;
|
||||
|
||||
use crate::Index;
|
||||
use crate::update::{IndexDocuments, UpdateFormat};
|
||||
|
||||
fn prepare_index() -> Index {
|
||||
#[test]
|
||||
fn initial_fields_distribution() {
|
||||
let path = tempfile::tempdir().unwrap();
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(10 * 1024 * 1024); // 10 MB
|
||||
let index = Index::new(options, &path).unwrap();
|
||||
|
||||
let mut wtxn = index.write_txn().unwrap();
|
||||
let content = &br#"
|
||||
{ "name": "kevin" }
|
||||
{ "name": "bob", "age": 20 }
|
||||
"#[..];
|
||||
let content = &br#"[
|
||||
{ "name": "kevin" },
|
||||
{ "name": "bob", "age": 20 }
|
||||
]"#[..];
|
||||
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
|
||||
builder.update_format(UpdateFormat::JsonStream);
|
||||
builder.update_format(UpdateFormat::Json);
|
||||
builder.execute(content, |_, _| ()).unwrap();
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
index
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fields_ids_distribution() {
|
||||
let index = prepare_index();
|
||||
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
|
||||
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
|
||||
|
||||
let fields_ids_distribution = index.fields_ids_distribution(&rtxn).unwrap();
|
||||
assert_eq!(fields_ids_distribution.len(), 2);
|
||||
assert_eq!(fields_ids_distribution.get(&fields_ids_map.id("age").unwrap()), Some(&1));
|
||||
assert_eq!(fields_ids_distribution.get(&fields_ids_map.id("name").unwrap()), Some(&2));
|
||||
let fields_distribution = index.fields_distribution(&rtxn).unwrap();
|
||||
assert_eq!(fields_distribution, hashmap!{
|
||||
"name".to_string() => 2,
|
||||
"age".to_string() => 1,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -41,6 +41,7 @@ pub type Attribute = u32;
|
||||
pub type DocumentId = u32;
|
||||
pub type FieldId = u8;
|
||||
pub type Position = u32;
|
||||
pub type FieldsDistribution = HashMap<String, u64>;
|
||||
|
||||
type MergeFn = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> anyhow::Result<Vec<u8>>;
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
use chrono::Utc;
|
||||
use roaring::RoaringBitmap;
|
||||
use crate::{ExternalDocumentsIds, Index};
|
||||
use crate::{ExternalDocumentsIds, Index, FieldsDistribution};
|
||||
|
||||
pub struct ClearDocuments<'t, 'u, 'i> {
|
||||
wtxn: &'t mut heed::RwTxn<'i, 'u>,
|
||||
@ -42,6 +42,7 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> {
|
||||
self.index.put_words_prefixes_fst(self.wtxn, &fst::Set::default())?;
|
||||
self.index.put_external_documents_ids(self.wtxn, &ExternalDocumentsIds::default())?;
|
||||
self.index.put_documents_ids(self.wtxn, &RoaringBitmap::default())?;
|
||||
self.index.put_fields_distribution(self.wtxn, &FieldsDistribution::default())?;
|
||||
|
||||
// We clean all the faceted documents ids.
|
||||
for (field_id, _) in faceted_fields {
|
||||
@ -61,3 +62,54 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> {
|
||||
Ok(number_of_documents)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use heed::EnvOpenOptions;
|
||||
|
||||
use crate::update::{IndexDocuments, UpdateFormat};
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn clear_documents() {
|
||||
let path = tempfile::tempdir().unwrap();
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(10 * 1024 * 1024); // 10 MB
|
||||
let index = Index::new(options, &path).unwrap();
|
||||
|
||||
let mut wtxn = index.write_txn().unwrap();
|
||||
let content = &br#"[
|
||||
{ "id": 0, "name": "kevin", "age": 20 },
|
||||
{ "id": 1, "name": "kevina" },
|
||||
{ "id": 2, "name": "benoit", "country": "France" }
|
||||
]"#[..];
|
||||
let mut builder = IndexDocuments::new(&mut wtxn, &index, 0);
|
||||
builder.update_format(UpdateFormat::Json);
|
||||
builder.execute(content, |_, _| ()).unwrap();
|
||||
|
||||
// Clear all documents from the database.
|
||||
let builder = ClearDocuments::new(&mut wtxn, &index, 1);
|
||||
assert_eq!(builder.execute().unwrap(), 3);
|
||||
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
|
||||
assert_eq!(index.fields_ids_map(&rtxn).unwrap().len(), 4);
|
||||
|
||||
assert!(index.words_fst(&rtxn).unwrap().is_empty());
|
||||
assert!(index.words_prefixes_fst(&rtxn).unwrap().is_empty());
|
||||
assert!(index.external_documents_ids(&rtxn).unwrap().is_empty());
|
||||
assert!(index.documents_ids(&rtxn).unwrap().is_empty());
|
||||
assert!(index.fields_distribution(&rtxn).unwrap().is_empty());
|
||||
|
||||
assert!(index.word_docids.is_empty(&rtxn).unwrap());
|
||||
assert!(index.word_prefix_docids.is_empty(&rtxn).unwrap());
|
||||
assert!(index.docid_word_positions.is_empty(&rtxn).unwrap());
|
||||
assert!(index.word_pair_proximity_docids.is_empty(&rtxn).unwrap());
|
||||
assert!(index.word_prefix_pair_proximity_docids.is_empty(&rtxn).unwrap());
|
||||
assert!(index.facet_field_id_value_docids.is_empty(&rtxn).unwrap());
|
||||
assert!(index.field_id_docid_facet_values.is_empty(&rtxn).unwrap());
|
||||
assert!(index.documents.is_empty(&rtxn).unwrap());
|
||||
}
|
||||
}
|
||||
|
@ -1,3 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::hash_map::Entry;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use chrono::Utc;
|
||||
use fst::IntoStreamer;
|
||||
@ -90,6 +93,9 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
|
||||
documents,
|
||||
} = self.index;
|
||||
|
||||
// Number of fields for each document that has been deleted.
|
||||
let mut fields_ids_distribution_diff = HashMap::new();
|
||||
|
||||
// Retrieve the words and the external documents ids contained in the documents.
|
||||
let mut words = Vec::new();
|
||||
let mut external_ids = Vec::new();
|
||||
@ -100,6 +106,10 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
|
||||
let key = BEU32::new(docid);
|
||||
let mut iter = documents.range_mut(self.wtxn, &(key..=key))?;
|
||||
if let Some((_key, obkv)) = iter.next().transpose()? {
|
||||
for (field_id, _) in obkv.iter() {
|
||||
*fields_ids_distribution_diff.entry(field_id).or_default() += 1;
|
||||
}
|
||||
|
||||
if let Some(content) = obkv.get(id_field) {
|
||||
let external_id = match serde_json::from_slice(content).unwrap() {
|
||||
Value::String(string) => SmallString32::from(string.as_str()),
|
||||
@ -112,7 +122,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
|
||||
}
|
||||
drop(iter);
|
||||
|
||||
// We iterate througt the words positions of the document id,
|
||||
// We iterate through the words positions of the document id,
|
||||
// retrieve the word and delete the positions.
|
||||
let mut iter = docid_word_positions.prefix_iter_mut(self.wtxn, &(docid, ""))?;
|
||||
while let Some(result) = iter.next() {
|
||||
@ -123,6 +133,24 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
|
||||
}
|
||||
}
|
||||
|
||||
let mut fields_distribution = self.index.fields_distribution(self.wtxn)?;
|
||||
|
||||
// We use pre-calculated number of fields occurrences that needs to be deleted
|
||||
// to reflect deleted documents.
|
||||
// If all field occurrences are removed, delete the entry from distribution.
|
||||
// Otherwise, insert new number of occurrences (current_count - count_diff).
|
||||
for (field_id, count_diff) in fields_ids_distribution_diff {
|
||||
let field_name = fields_ids_map.name(field_id).unwrap();
|
||||
if let Entry::Occupied(mut entry) = fields_distribution.entry(field_name.to_string()) {
|
||||
match entry.get().checked_sub(count_diff) {
|
||||
Some(0) | None => entry.remove(),
|
||||
Some(count) => entry.insert(count)
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
self.index.put_fields_distribution(self.wtxn, &fields_distribution)?;
|
||||
|
||||
// We create the FST map of the external ids that we must delete.
|
||||
external_ids.sort_unstable();
|
||||
let external_ids_to_delete = fst::Set::from_iter(external_ids.iter().map(AsRef::as_ref))?;
|
||||
@ -347,5 +375,9 @@ mod tests {
|
||||
builder.execute().unwrap();
|
||||
|
||||
wtxn.commit().unwrap();
|
||||
|
||||
let rtxn = index.read_txn().unwrap();
|
||||
|
||||
assert!(index.fields_distribution(&rtxn).unwrap().is_empty());
|
||||
}
|
||||
}
|
||||
|
@ -358,6 +358,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
|
||||
let TransformOutput {
|
||||
primary_key,
|
||||
fields_ids_map,
|
||||
fields_distribution,
|
||||
external_documents_ids,
|
||||
new_documents_ids,
|
||||
replaced_documents_ids,
|
||||
@ -551,6 +552,9 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
|
||||
// We write the fields ids map into the main database
|
||||
self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?;
|
||||
|
||||
// We write the fields distribution into the main database
|
||||
self.index.put_fields_distribution(self.wtxn, &fields_distribution)?;
|
||||
|
||||
// We write the primary key field id into the main database
|
||||
self.index.put_primary_key(self.wtxn, &primary_key)?;
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::fs::File;
|
||||
use std::io::{Read, Seek, SeekFrom};
|
||||
use std::iter::Peekable;
|
||||
@ -10,7 +11,7 @@ use log::info;
|
||||
use roaring::RoaringBitmap;
|
||||
use serde_json::{Map, Value};
|
||||
|
||||
use crate::{Index, BEU32, MergeFn, FieldsIdsMap, ExternalDocumentsIds, FieldId};
|
||||
use crate::{Index, BEU32, MergeFn, FieldsIdsMap, ExternalDocumentsIds, FieldId, FieldsDistribution};
|
||||
use crate::update::{AvailableDocumentsIds, UpdateIndexingStep};
|
||||
use super::merge_function::merge_two_obkvs;
|
||||
use super::{create_writer, create_sorter, IndexDocumentsMethod};
|
||||
@ -20,6 +21,7 @@ const DEFAULT_PRIMARY_KEY_NAME: &str = "id";
|
||||
pub struct TransformOutput {
|
||||
pub primary_key: String,
|
||||
pub fields_ids_map: FieldsIdsMap,
|
||||
pub fields_distribution: FieldsDistribution,
|
||||
pub external_documents_ids: ExternalDocumentsIds<'static>,
|
||||
pub new_documents_ids: RoaringBitmap,
|
||||
pub replaced_documents_ids: RoaringBitmap,
|
||||
@ -74,6 +76,7 @@ impl Transform<'_, '_> {
|
||||
F: Fn(UpdateIndexingStep) + Sync,
|
||||
{
|
||||
let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?;
|
||||
let mut fields_distribution = self.index.fields_distribution(self.rtxn)?;
|
||||
let external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap();
|
||||
|
||||
// Deserialize the whole batch of documents in memory.
|
||||
@ -103,6 +106,7 @@ impl Transform<'_, '_> {
|
||||
return Ok(TransformOutput {
|
||||
primary_key,
|
||||
fields_ids_map,
|
||||
fields_distribution,
|
||||
external_documents_ids: ExternalDocumentsIds::default(),
|
||||
new_documents_ids: RoaringBitmap::new(),
|
||||
replaced_documents_ids: RoaringBitmap::new(),
|
||||
@ -133,6 +137,8 @@ impl Transform<'_, '_> {
|
||||
let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH];
|
||||
let mut documents_count = 0;
|
||||
|
||||
let mut fields_ids_distribution = HashMap::new();
|
||||
|
||||
for result in documents {
|
||||
let document = result?;
|
||||
|
||||
@ -147,7 +153,9 @@ impl Transform<'_, '_> {
|
||||
|
||||
// We prepare the fields ids map with the documents keys.
|
||||
for (key, _value) in &document {
|
||||
fields_ids_map.insert(&key).context("field id limit reached")?;
|
||||
let field_id = fields_ids_map.insert(&key).context("field id limit reached")?;
|
||||
|
||||
*fields_ids_distribution.entry(field_id).or_insert(0) += 1;
|
||||
}
|
||||
|
||||
// We retrieve the user id from the document based on the primary key name,
|
||||
@ -190,6 +198,11 @@ impl Transform<'_, '_> {
|
||||
documents_count += 1;
|
||||
}
|
||||
|
||||
for (field_id, count) in fields_ids_distribution {
|
||||
let field_name = fields_ids_map.name(field_id).unwrap();
|
||||
*fields_distribution.entry(field_name.to_string()).or_default() += count;
|
||||
}
|
||||
|
||||
progress_callback(UpdateIndexingStep::TransformFromUserIntoGenericFormat {
|
||||
documents_seen: documents_count,
|
||||
});
|
||||
@ -200,6 +213,7 @@ impl Transform<'_, '_> {
|
||||
sorter,
|
||||
primary_key,
|
||||
fields_ids_map,
|
||||
fields_distribution,
|
||||
documents_count,
|
||||
external_documents_ids,
|
||||
progress_callback,
|
||||
@ -212,6 +226,7 @@ impl Transform<'_, '_> {
|
||||
F: Fn(UpdateIndexingStep) + Sync,
|
||||
{
|
||||
let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?;
|
||||
let mut fields_distribution = self.index.fields_distribution(self.rtxn)?;
|
||||
let external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap();
|
||||
|
||||
let mut csv = csv::Reader::from_reader(reader);
|
||||
@ -269,6 +284,8 @@ impl Transform<'_, '_> {
|
||||
let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH];
|
||||
let mut documents_count = 0;
|
||||
|
||||
let mut fields_ids_distribution = HashMap::new();
|
||||
|
||||
let mut record = csv::StringRecord::new();
|
||||
while csv.read_record(&mut record)? {
|
||||
obkv_buffer.clear();
|
||||
@ -307,6 +324,8 @@ impl Transform<'_, '_> {
|
||||
json_buffer.clear();
|
||||
serde_json::to_writer(&mut json_buffer, &field)?;
|
||||
writer.insert(*field_id, &json_buffer)?;
|
||||
|
||||
*fields_ids_distribution.entry(*field_id).or_insert(0) += 1;
|
||||
}
|
||||
|
||||
// We use the extracted/generated user id as the key for this document.
|
||||
@ -314,6 +333,11 @@ impl Transform<'_, '_> {
|
||||
documents_count += 1;
|
||||
}
|
||||
|
||||
for (field_id, count) in fields_ids_distribution {
|
||||
let field_name = fields_ids_map.name(field_id).unwrap();
|
||||
*fields_distribution.entry(field_name.to_string()).or_default() += count;
|
||||
}
|
||||
|
||||
progress_callback(UpdateIndexingStep::TransformFromUserIntoGenericFormat {
|
||||
documents_seen: documents_count,
|
||||
});
|
||||
@ -328,6 +352,7 @@ impl Transform<'_, '_> {
|
||||
sorter,
|
||||
primary_key_name,
|
||||
fields_ids_map,
|
||||
fields_distribution,
|
||||
documents_count,
|
||||
external_documents_ids,
|
||||
progress_callback,
|
||||
@ -342,6 +367,7 @@ impl Transform<'_, '_> {
|
||||
sorter: grenad::Sorter<MergeFn>,
|
||||
primary_key: String,
|
||||
fields_ids_map: FieldsIdsMap,
|
||||
fields_distribution: FieldsDistribution,
|
||||
approximate_number_of_documents: usize,
|
||||
mut external_documents_ids: ExternalDocumentsIds<'_>,
|
||||
progress_callback: F,
|
||||
@ -439,6 +465,7 @@ impl Transform<'_, '_> {
|
||||
Ok(TransformOutput {
|
||||
primary_key,
|
||||
fields_ids_map,
|
||||
fields_distribution,
|
||||
external_documents_ids: external_documents_ids.into_static(),
|
||||
new_documents_ids,
|
||||
replaced_documents_ids,
|
||||
@ -457,6 +484,7 @@ impl Transform<'_, '_> {
|
||||
new_fields_ids_map: FieldsIdsMap,
|
||||
) -> anyhow::Result<TransformOutput>
|
||||
{
|
||||
let fields_distribution = self.index.fields_distribution(self.rtxn)?;
|
||||
let external_documents_ids = self.index.external_documents_ids(self.rtxn)?;
|
||||
let documents_ids = self.index.documents_ids(self.rtxn)?;
|
||||
let documents_count = documents_ids.len() as usize;
|
||||
@ -492,6 +520,7 @@ impl Transform<'_, '_> {
|
||||
Ok(TransformOutput {
|
||||
primary_key,
|
||||
fields_ids_map: new_fields_ids_map,
|
||||
fields_distribution,
|
||||
external_documents_ids: external_documents_ids.into_static(),
|
||||
new_documents_ids: documents_ids,
|
||||
replaced_documents_ids: RoaringBitmap::default(),
|
||||
|
@ -183,7 +183,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Udpates the index's searchable attributes. This causes the field map to be recomputed to
|
||||
/// Updates the index's searchable attributes. This causes the field map to be recomputed to
|
||||
/// reflect the order of the searchable attributes.
|
||||
fn update_searchable(&mut self) -> anyhow::Result<bool> {
|
||||
match self.searchable_fields {
|
||||
|
Loading…
x
Reference in New Issue
Block a user