4646: Reduce `Transform`'s disk usage r=Kerollmops a=Kerollmops

This PR implements what is described in #4485. It reduces the number of disk writes and disk usage.

Co-authored-by: Clément Renault <clement@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2024-05-23 16:06:50 +00:00 committed by GitHub
commit 19acc65ad2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 197 additions and 103 deletions

View File

@ -680,6 +680,26 @@ async fn search_facet_distribution() {
}, },
) )
.await; .await;
index.update_settings(json!({"filterableAttributes": ["doggos.name"]})).await;
index.wait_task(5).await;
index
.search(
json!({
"facets": ["doggos.name"]
}),
|response, code| {
assert_eq!(code, 200, "{}", response);
let dist = response["facetDistribution"].as_object().unwrap();
assert_eq!(dist.len(), 1);
assert_eq!(
dist["doggos.name"],
json!({ "bobby": 1, "buddy": 1, "gros bill": 1, "turbo": 1, "fast": 1})
);
},
)
.await;
} }
#[actix_rt::test] #[actix_rt::test]

View File

@ -195,7 +195,7 @@ mod tests {
fn merge_cbo_roaring_bitmaps() { fn merge_cbo_roaring_bitmaps() {
let mut buffer = Vec::new(); let mut buffer = Vec::new();
let small_data = vec![ let small_data = [
RoaringBitmap::from_sorted_iter(1..4).unwrap(), RoaringBitmap::from_sorted_iter(1..4).unwrap(),
RoaringBitmap::from_sorted_iter(2..5).unwrap(), RoaringBitmap::from_sorted_iter(2..5).unwrap(),
RoaringBitmap::from_sorted_iter(4..6).unwrap(), RoaringBitmap::from_sorted_iter(4..6).unwrap(),
@ -209,7 +209,7 @@ mod tests {
let expected = RoaringBitmap::from_sorted_iter(1..6).unwrap(); let expected = RoaringBitmap::from_sorted_iter(1..6).unwrap();
assert_eq!(bitmap, expected); assert_eq!(bitmap, expected);
let medium_data = vec![ let medium_data = [
RoaringBitmap::from_sorted_iter(1..4).unwrap(), RoaringBitmap::from_sorted_iter(1..4).unwrap(),
RoaringBitmap::from_sorted_iter(2..5).unwrap(), RoaringBitmap::from_sorted_iter(2..5).unwrap(),
RoaringBitmap::from_sorted_iter(4..8).unwrap(), RoaringBitmap::from_sorted_iter(4..8).unwrap(),

View File

@ -354,8 +354,7 @@ pub fn is_faceted(field: &str, faceted_fields: impl IntoIterator<Item = impl AsR
/// assert!(!is_faceted_by("animaux.chien", "animaux.chie")); /// assert!(!is_faceted_by("animaux.chien", "animaux.chie"));
/// ``` /// ```
pub fn is_faceted_by(field: &str, facet: &str) -> bool { pub fn is_faceted_by(field: &str, facet: &str) -> bool {
field.starts_with(facet) field.starts_with(facet) && field[facet.len()..].chars().next().map_or(true, |c| c == '.')
&& field[facet.len()..].chars().next().map(|c| c == '.').unwrap_or(true)
} }
pub fn normalize_facet(original: &str) -> String { pub fn normalize_facet(original: &str) -> String {

View File

@ -6,6 +6,7 @@ mod typed_chunk;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::io::{Read, Seek}; use std::io::{Read, Seek};
use std::iter;
use std::num::NonZeroU32; use std::num::NonZeroU32;
use std::result::Result as StdResult; use std::result::Result as StdResult;
use std::sync::Arc; use std::sync::Arc;
@ -359,7 +360,10 @@ where
let min_chunk_size = 1024 * 512; // 512KiB let min_chunk_size = 1024 * 512; // 512KiB
// compute the chunk size from the number of available threads and the inputed data size. // compute the chunk size from the number of available threads and the inputed data size.
let total_size = flattened_documents.metadata().map(|m| m.len()); let total_size = match flattened_documents.as_ref() {
Some(flattened_documents) => flattened_documents.metadata().map(|m| m.len()),
None => Ok(default_chunk_size as u64),
};
let current_num_threads = pool.current_num_threads(); let current_num_threads = pool.current_num_threads();
// if we have more than 2 thread, create a number of chunk equal to 3/4 threads count // if we have more than 2 thread, create a number of chunk equal to 3/4 threads count
let chunk_count = if current_num_threads > 2 { let chunk_count = if current_num_threads > 2 {
@ -373,8 +377,14 @@ where
} }
}; };
let original_documents = grenad::Reader::new(original_documents)?; let original_documents = match original_documents {
let flattened_documents = grenad::Reader::new(flattened_documents)?; Some(original_documents) => Some(grenad::Reader::new(original_documents)?),
None => None,
};
let flattened_documents = match flattened_documents {
Some(flattened_documents) => Some(grenad::Reader::new(flattened_documents)?),
None => None,
};
let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes; let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes;
@ -393,15 +403,23 @@ where
pool.install(|| { pool.install(|| {
rayon::spawn(move || { rayon::spawn(move || {
let child_span = tracing::trace_span!(target: "indexing::details", parent: &current_span, "extract_and_send_grenad_chunks"); let child_span = tracing::trace_span!(target: "indexing::details", parent: &current_span, "extract_and_send_grenad_chunks");
let _enter = child_span.enter(); let _enter = child_span.enter();
puffin::profile_scope!("extract_and_send_grenad_chunks"); puffin::profile_scope!("extract_and_send_grenad_chunks");
// split obkv file into several chunks // split obkv file into several chunks
let original_chunk_iter = let original_chunk_iter = match original_documents {
grenad_obkv_into_chunks(original_documents, pool_params, documents_chunk_size); Some(original_documents) => {
grenad_obkv_into_chunks(original_documents,pool_params,documents_chunk_size).map(either::Left)
},
None => Ok(either::Right(iter::empty())),
};
// split obkv file into several chunks // split obkv file into several chunks
let flattened_chunk_iter = let flattened_chunk_iter = match flattened_documents {
grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size); Some(flattened_documents) => {
grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size).map(either::Left)
},
None => Ok(either::Right(iter::empty())),
};
let result = original_chunk_iter.and_then(|original_chunk| { let result = original_chunk_iter.and_then(|original_chunk| {
let flattened_chunk = flattened_chunk_iter?; let flattened_chunk = flattened_chunk_iter?;

View File

@ -1,7 +1,7 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::collections::btree_map::Entry as BEntry; use std::collections::btree_map::Entry as BEntry;
use std::collections::hash_map::Entry as HEntry; use std::collections::hash_map::Entry as HEntry;
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
use std::fs::File; use std::fs::File;
use std::io::{Read, Seek}; use std::io::{Read, Seek};
@ -20,21 +20,21 @@ use super::{IndexDocumentsMethod, IndexerConfig};
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
use crate::error::{Error, InternalError, UserError}; use crate::error::{Error, InternalError, UserError};
use crate::index::{db_name, main_key}; use crate::index::{db_name, main_key};
use crate::update::del_add::{ use crate::update::del_add::{into_del_add_obkv, DelAdd, DelAddOperation, KvReaderDelAdd};
del_add_from_two_obkvs, into_del_add_obkv, DelAdd, DelAddOperation, KvReaderDelAdd,
};
use crate::update::index_documents::GrenadParameters; use crate::update::index_documents::GrenadParameters;
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff}; use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
use crate::update::{AvailableDocumentsIds, UpdateIndexingStep}; use crate::update::{AvailableDocumentsIds, UpdateIndexingStep};
use crate::{FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result}; use crate::{
is_faceted_by, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result,
};
pub struct TransformOutput { pub struct TransformOutput {
pub primary_key: String, pub primary_key: String,
pub settings_diff: InnerIndexSettingsDiff, pub settings_diff: InnerIndexSettingsDiff,
pub field_distribution: FieldDistribution, pub field_distribution: FieldDistribution,
pub documents_count: usize, pub documents_count: usize,
pub original_documents: File, pub original_documents: Option<File>,
pub flattened_documents: File, pub flattened_documents: Option<File>,
} }
/// Extract the external ids, deduplicate and compute the new internal documents ids /// Extract the external ids, deduplicate and compute the new internal documents ids
@ -808,11 +808,15 @@ impl<'a, 'i> Transform<'a, 'i> {
})?; })?;
let old_inner_settings = InnerIndexSettings::from_index(self.index, wtxn)?; let old_inner_settings = InnerIndexSettings::from_index(self.index, wtxn)?;
let fields_ids_map = self.fields_ids_map;
let primary_key_id = self.index.primary_key(wtxn)?.and_then(|name| fields_ids_map.id(name));
let mut new_inner_settings = old_inner_settings.clone(); let mut new_inner_settings = old_inner_settings.clone();
new_inner_settings.fields_ids_map = self.fields_ids_map; new_inner_settings.fields_ids_map = fields_ids_map;
let settings_diff = InnerIndexSettingsDiff { let settings_diff = InnerIndexSettingsDiff {
old: old_inner_settings, old: old_inner_settings,
new: new_inner_settings, new: new_inner_settings,
primary_key_id,
embedding_configs_updated: false, embedding_configs_updated: false,
settings_update_only: false, settings_update_only: false,
}; };
@ -822,10 +826,12 @@ impl<'a, 'i> Transform<'a, 'i> {
settings_diff, settings_diff,
field_distribution, field_distribution,
documents_count: self.documents_count, documents_count: self.documents_count,
original_documents: original_documents.into_inner().map_err(|err| err.into_error())?, original_documents: Some(
flattened_documents: flattened_documents original_documents.into_inner().map_err(|err| err.into_error())?,
.into_inner() ),
.map_err(|err| err.into_error())?, flattened_documents: Some(
flattened_documents.into_inner().map_err(|err| err.into_error())?,
),
}) })
} }
@ -835,34 +841,66 @@ impl<'a, 'i> Transform<'a, 'i> {
fn rebind_existing_document( fn rebind_existing_document(
old_obkv: KvReader<FieldId>, old_obkv: KvReader<FieldId>,
settings_diff: &InnerIndexSettingsDiff, settings_diff: &InnerIndexSettingsDiff,
original_obkv_buffer: &mut Vec<u8>, modified_faceted_fields: &HashSet<String>,
flattened_obkv_buffer: &mut Vec<u8>, original_obkv_buffer: Option<&mut Vec<u8>>,
flattened_obkv_buffer: Option<&mut Vec<u8>>,
) -> Result<()> { ) -> Result<()> {
let mut old_fields_ids_map = settings_diff.old.fields_ids_map.clone(); // Always keep the primary key.
let mut new_fields_ids_map = settings_diff.new.fields_ids_map.clone(); let is_primary_key = |id: FieldId| -> bool { settings_diff.primary_key_id == Some(id) };
// If only the `searchableAttributes` has been changed, keep only the searchable fields.
let must_reindex_searchables = settings_diff.reindex_searchable();
let necessary_searchable_field = |id: FieldId| -> bool {
must_reindex_searchables
&& (settings_diff.old.searchable_fields_ids.contains(&id)
|| settings_diff.new.searchable_fields_ids.contains(&id))
};
// If only a faceted field has been added, keep only this field.
let must_reindex_facets = settings_diff.reindex_facets();
let necessary_faceted_field = |id: FieldId| -> bool {
let field_name = settings_diff.new.fields_ids_map.name(id).unwrap();
must_reindex_facets
&& modified_faceted_fields
.iter()
.any(|long| is_faceted_by(long, field_name) || is_faceted_by(field_name, long))
};
// Alway provide all fields when vectors are involved because
// we need the fields for the prompt/templating.
let reindex_vectors = settings_diff.reindex_vectors();
let mut obkv_writer = KvWriter::<_, FieldId>::memory(); let mut obkv_writer = KvWriter::<_, FieldId>::memory();
// We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv. for (id, val) in old_obkv.iter() {
for (id, name) in new_fields_ids_map.iter() { if is_primary_key(id)
if let Some(val) = old_fields_ids_map.id(name).and_then(|id| old_obkv.get(id)) { || necessary_searchable_field(id)
|| necessary_faceted_field(id)
|| reindex_vectors
{
obkv_writer.insert(id, val)?; obkv_writer.insert(id, val)?;
} }
} }
let data = obkv_writer.into_inner()?; let data = obkv_writer.into_inner()?;
let new_obkv = KvReader::<FieldId>::new(&data); let obkv = KvReader::<FieldId>::new(&data);
// take the non-flattened version if flatten_from_fields_ids_map returns None. if let Some(original_obkv_buffer) = original_obkv_buffer {
let old_flattened = Self::flatten_from_fields_ids_map(&old_obkv, &mut old_fields_ids_map)?; original_obkv_buffer.clear();
let old_flattened = into_del_add_obkv(obkv, DelAddOperation::DeletionAndAddition, original_obkv_buffer)?;
old_flattened.as_deref().map_or_else(|| old_obkv, KvReader::<FieldId>::new); }
let new_flattened = Self::flatten_from_fields_ids_map(&new_obkv, &mut new_fields_ids_map)?;
let new_flattened =
new_flattened.as_deref().map_or_else(|| new_obkv, KvReader::<FieldId>::new);
original_obkv_buffer.clear(); if let Some(flattened_obkv_buffer) = flattened_obkv_buffer {
flattened_obkv_buffer.clear(); // take the non-flattened version if flatten_from_fields_ids_map returns None.
let mut fields_ids_map = settings_diff.new.fields_ids_map.clone();
let flattened = Self::flatten_from_fields_ids_map(&obkv, &mut fields_ids_map)?;
let flattened = flattened.as_deref().map_or(obkv, KvReader::new);
del_add_from_two_obkvs(&old_obkv, &new_obkv, original_obkv_buffer)?; flattened_obkv_buffer.clear();
del_add_from_two_obkvs(&old_flattened, &new_flattened, flattened_obkv_buffer)?; into_del_add_obkv(
flattened,
DelAddOperation::DeletionAndAddition,
flattened_obkv_buffer,
)?;
}
Ok(()) Ok(())
} }
@ -891,46 +929,63 @@ impl<'a, 'i> Transform<'a, 'i> {
let documents_count = documents_ids.len() as usize; let documents_count = documents_ids.len() as usize;
// We initialize the sorter with the user indexing settings. // We initialize the sorter with the user indexing settings.
let mut original_sorter = create_sorter( let mut original_sorter = if settings_diff.reindex_vectors() {
grenad::SortAlgorithm::Stable, Some(create_sorter(
keep_first, grenad::SortAlgorithm::Stable,
self.indexer_settings.chunk_compression_type, keep_first,
self.indexer_settings.chunk_compression_level, self.indexer_settings.chunk_compression_type,
self.indexer_settings.max_nb_chunks, self.indexer_settings.chunk_compression_level,
self.indexer_settings.max_memory.map(|mem| mem / 2), self.indexer_settings.max_nb_chunks,
); self.indexer_settings.max_memory.map(|mem| mem / 2),
))
} else {
None
};
// We initialize the sorter with the user indexing settings. // We initialize the sorter with the user indexing settings.
let mut flattened_sorter = create_sorter( let mut flattened_sorter =
grenad::SortAlgorithm::Stable, if settings_diff.reindex_searchable() || settings_diff.reindex_facets() {
keep_first, Some(create_sorter(
self.indexer_settings.chunk_compression_type, grenad::SortAlgorithm::Stable,
self.indexer_settings.chunk_compression_level, keep_first,
self.indexer_settings.max_nb_chunks, self.indexer_settings.chunk_compression_type,
self.indexer_settings.max_memory.map(|mem| mem / 2), self.indexer_settings.chunk_compression_level,
); self.indexer_settings.max_nb_chunks,
self.indexer_settings.max_memory.map(|mem| mem / 2),
))
} else {
None
};
let mut original_obkv_buffer = Vec::new(); if original_sorter.is_some() || flattened_sorter.is_some() {
let mut flattened_obkv_buffer = Vec::new(); let modified_faceted_fields = settings_diff.modified_faceted_fields();
let mut document_sorter_key_buffer = Vec::new(); let mut original_obkv_buffer = Vec::new();
for result in self.index.external_documents_ids().iter(wtxn)? { let mut flattened_obkv_buffer = Vec::new();
let (external_id, docid) = result?; let mut document_sorter_key_buffer = Vec::new();
let old_obkv = self.index.documents.get(wtxn, &docid)?.ok_or( for result in self.index.external_documents_ids().iter(wtxn)? {
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, let (external_id, docid) = result?;
)?; let old_obkv = self.index.documents.get(wtxn, &docid)?.ok_or(
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
)?;
Self::rebind_existing_document( Self::rebind_existing_document(
old_obkv, old_obkv,
&settings_diff, &settings_diff,
&mut original_obkv_buffer, &modified_faceted_fields,
&mut flattened_obkv_buffer, Some(&mut original_obkv_buffer).filter(|_| original_sorter.is_some()),
)?; Some(&mut flattened_obkv_buffer).filter(|_| flattened_sorter.is_some()),
)?;
document_sorter_key_buffer.clear(); if let Some(original_sorter) = original_sorter.as_mut() {
document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); document_sorter_key_buffer.clear();
document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
original_sorter.insert(&document_sorter_key_buffer, &original_obkv_buffer)?; document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
flattened_sorter.insert(docid.to_be_bytes(), &flattened_obkv_buffer)?; original_sorter.insert(&document_sorter_key_buffer, &original_obkv_buffer)?;
}
if let Some(flattened_sorter) = flattened_sorter.as_mut() {
flattened_sorter.insert(docid.to_be_bytes(), &flattened_obkv_buffer)?;
}
}
} }
let grenad_params = GrenadParameters { let grenad_params = GrenadParameters {
@ -941,17 +996,22 @@ impl<'a, 'i> Transform<'a, 'i> {
}; };
// Once we have written all the documents, we merge everything into a Reader. // Once we have written all the documents, we merge everything into a Reader.
let original_documents = sorter_into_reader(original_sorter, grenad_params)?; let flattened_documents = match flattened_sorter {
Some(flattened_sorter) => Some(sorter_into_reader(flattened_sorter, grenad_params)?),
let flattened_documents = sorter_into_reader(flattened_sorter, grenad_params)?; None => None,
};
let original_documents = match original_sorter {
Some(original_sorter) => Some(sorter_into_reader(original_sorter, grenad_params)?),
None => None,
};
Ok(TransformOutput { Ok(TransformOutput {
primary_key, primary_key,
field_distribution, field_distribution,
settings_diff, settings_diff,
documents_count, documents_count,
original_documents: original_documents.into_inner().into_inner(), original_documents: original_documents.map(|od| od.into_inner().into_inner()),
flattened_documents: flattened_documents.into_inner().into_inner(), flattened_documents: flattened_documents.map(|fd| fd.into_inner().into_inner()),
}) })
} }
} }

View File

@ -1067,10 +1067,17 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
// 3. Keep the old vectors but reattempt indexing on a prompt change: only actually changed prompt will need embedding + storage // 3. Keep the old vectors but reattempt indexing on a prompt change: only actually changed prompt will need embedding + storage
let embedding_configs_updated = self.update_embedding_configs()?; let embedding_configs_updated = self.update_embedding_configs()?;
let new_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn)?; let mut new_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn)?;
new_inner_settings.recompute_facets(self.wtxn, self.index)?;
let primary_key_id = self
.index
.primary_key(self.wtxn)?
.and_then(|name| new_inner_settings.fields_ids_map.id(name));
let inner_settings_diff = InnerIndexSettingsDiff { let inner_settings_diff = InnerIndexSettingsDiff {
old: old_inner_settings, old: old_inner_settings,
new: new_inner_settings, new: new_inner_settings,
primary_key_id,
embedding_configs_updated, embedding_configs_updated,
settings_update_only: true, settings_update_only: true,
}; };
@ -1086,10 +1093,9 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
pub struct InnerIndexSettingsDiff { pub struct InnerIndexSettingsDiff {
pub(crate) old: InnerIndexSettings, pub(crate) old: InnerIndexSettings,
pub(crate) new: InnerIndexSettings, pub(crate) new: InnerIndexSettings,
pub(crate) primary_key_id: Option<FieldId>,
// TODO: compare directly the embedders. // TODO: compare directly the embedders.
pub(crate) embedding_configs_updated: bool, pub(crate) embedding_configs_updated: bool,
pub(crate) settings_update_only: bool, pub(crate) settings_update_only: bool,
} }
@ -1099,13 +1105,8 @@ impl InnerIndexSettingsDiff {
} }
pub fn reindex_searchable(&self) -> bool { pub fn reindex_searchable(&self) -> bool {
self.old self.old.stop_words.as_ref().map(|set| set.as_fst().as_bytes())
.fields_ids_map != self.new.stop_words.as_ref().map(|set| set.as_fst().as_bytes())
.iter()
.zip(self.new.fields_ids_map.iter())
.any(|(old, new)| old != new)
|| self.old.stop_words.as_ref().map(|set| set.as_fst().as_bytes())
!= self.new.stop_words.as_ref().map(|set| set.as_fst().as_bytes())
|| self.old.allowed_separators != self.new.allowed_separators || self.old.allowed_separators != self.new.allowed_separators
|| self.old.dictionary != self.new.dictionary || self.old.dictionary != self.new.dictionary
|| self.old.user_defined_searchable_fields != self.new.user_defined_searchable_fields || self.old.user_defined_searchable_fields != self.new.user_defined_searchable_fields
@ -1132,15 +1133,7 @@ impl InnerIndexSettingsDiff {
return true; return true;
} }
let faceted_updated = (existing_fields - old_faceted_fields) != (existing_fields - new_faceted_fields)
(existing_fields - old_faceted_fields) != (existing_fields - new_faceted_fields);
self.old
.fields_ids_map
.iter()
.zip(self.new.fields_ids_map.iter())
.any(|(old, new)| old != new)
|| faceted_updated
} }
pub fn reindex_vectors(&self) -> bool { pub fn reindex_vectors(&self) -> bool {
@ -1150,6 +1143,10 @@ impl InnerIndexSettingsDiff {
pub fn settings_update_only(&self) -> bool { pub fn settings_update_only(&self) -> bool {
self.settings_update_only self.settings_update_only
} }
pub fn modified_faceted_fields(&self) -> HashSet<String> {
&self.old.user_defined_faceted_fields ^ &self.new.user_defined_faceted_fields
}
} }
#[derive(Clone)] #[derive(Clone)]