Merge branch 'main' into merge-release-v1.8.1-in-main

This commit is contained in:
Many the fish 2024-05-29 11:31:03 +02:00 committed by GitHub
commit e1fbfde6c4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
105 changed files with 5863 additions and 1031 deletions

View file

@ -21,8 +21,6 @@ impl<'t, 'i> ClearDocuments<'t, 'i> {
name = "clear_documents"
)]
pub fn execute(self) -> Result<u64> {
puffin::profile_function!();
self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?;
let Index {
env: _env,

View file

@ -499,7 +499,7 @@ impl FacetsUpdateIncrementalInner {
ModificationResult::Expand | ModificationResult::Reduce { .. }
)
{
// if any modification occured, insert it in the database.
// if any modification occurred, insert it in the database.
self.db.put(txn, &insertion_key.as_ref(), &updated_value)?;
Ok(insertion_key_modification)
} else {

View file

@ -379,7 +379,7 @@ pub(crate) mod test_helpers {
let mut options = heed::EnvOpenOptions::new();
let options = options.map_size(4096 * 4 * 1000 * 100);
let tempdir = tempfile::TempDir::new().unwrap();
let env = options.open(tempdir.path()).unwrap();
let env = unsafe { options.open(tempdir.path()) }.unwrap();
let mut wtxn = env.write_txn().unwrap();
let content = env.create_database(&mut wtxn, None).unwrap();
wtxn.commit().unwrap();

View file

@ -29,8 +29,6 @@ pub fn enrich_documents_batch<R: Read + Seek>(
autogenerate_docids: bool,
reader: DocumentsBatchReader<R>,
) -> Result<StdResult<EnrichedDocumentsBatchReader<R>, UserError>> {
puffin::profile_function!();
let (mut cursor, mut documents_batch_index) = reader.into_cursor_and_fields_index();
let mut external_ids = tempfile::tempfile().map(BufWriter::new).map(grenad::Writer::new)?;

View file

@ -29,8 +29,6 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
settings_diff: &InnerIndexSettingsDiff,
max_positions_per_attributes: Option<u32>,
) -> Result<(grenad::Reader<BufReader<File>>, ScriptLanguageDocidsMap)> {
puffin::profile_function!();
let max_positions_per_attributes = max_positions_per_attributes
.map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE));
let max_memory = indexer.max_memory_by_thread();
@ -186,7 +184,7 @@ fn searchable_fields_changed(
) -> bool {
let searchable_fields = &settings_diff.new.searchable_fields_ids;
for (field_id, field_bytes) in obkv.iter() {
if searchable_fields.as_ref().map_or(true, |sf| sf.contains(&field_id)) {
if searchable_fields.contains(&field_id) {
let del_add = KvReaderDelAdd::new(field_bytes);
match (del_add.get(DelAdd::Deletion), del_add.get(DelAdd::Addition)) {
// if both fields are None, check the next field.
@ -298,7 +296,7 @@ fn lang_safe_tokens_from_document<'a>(
/// Extract words mapped with their positions of a document.
fn tokens_from_document<'a>(
obkv: &KvReader<FieldId>,
searchable_fields: &Option<Vec<FieldId>>,
searchable_fields: &[FieldId],
tokenizer: &Tokenizer,
max_positions_per_attributes: u32,
del_add: DelAdd,
@ -309,7 +307,7 @@ fn tokens_from_document<'a>(
let mut document_writer = KvWriterU16::new(&mut buffers.obkv_buffer);
for (field_id, field_bytes) in obkv.iter() {
// if field is searchable.
if searchable_fields.as_ref().map_or(true, |sf| sf.contains(&field_id)) {
if searchable_fields.as_ref().contains(&field_id) {
// extract deletion or addition only.
if let Some(field_bytes) = KvReaderDelAdd::new(field_bytes).get(del_add) {
// parse json.

View file

@ -23,8 +23,6 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters,
_settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut facet_number_docids_sorter = create_sorter(

View file

@ -28,8 +28,6 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters,
_settings_diff: &InnerIndexSettingsDiff,
) -> Result<(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>)> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let options = NormalizerOption { lossy: true, ..Default::default() };

View file

@ -37,7 +37,7 @@ pub struct ExtractedFacetValues {
/// Extracts the facet values of each faceted field of each document.
///
/// Returns the generated grenad reader containing the docid the fid and the orginal value as key
/// Returns the generated grenad reader containing the docid the fid and the original value as key
/// and the normalized value as value extracted from the given chunk of documents.
/// We need the fid of the geofields to correctly parse them as numbers if they were sent as strings initially.
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")]
@ -46,8 +46,6 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
indexer: GrenadParameters,
settings_diff: &InnerIndexSettingsDiff,
) -> Result<ExtractedFacetValues> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut fid_docid_facet_numbers_sorter = create_sorter(

View file

@ -26,8 +26,6 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters,
_settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut fid_word_count_docids_sorter = create_sorter(

View file

@ -21,8 +21,6 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
primary_key_id: FieldId,
settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,

View file

@ -10,16 +10,16 @@ use bytemuck::cast_slice;
use grenad::Writer;
use itertools::EitherOrBoth;
use ordered_float::OrderedFloat;
use serde_json::{from_slice, Value};
use serde_json::Value;
use super::helpers::{create_writer, writer_into_reader, GrenadParameters};
use crate::error::UserError;
use crate::prompt::Prompt;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::index_documents::helpers::try_split_at;
use crate::update::settings::InnerIndexSettingsDiff;
use crate::vector::parsed_vectors::{ParsedVectorsDiff, RESERVED_VECTORS_FIELD_NAME};
use crate::vector::Embedder;
use crate::{DocumentId, InternalError, Result, ThreadPoolNoAbort, VectorOrArrayOfVectors};
use crate::{DocumentId, Result, ThreadPoolNoAbort};
/// The length of the elements that are always in the buffer when inserting new values.
const TRUNCATE_SIZE: usize = size_of::<DocumentId>();
@ -31,6 +31,10 @@ pub struct ExtractedVectorPoints {
pub remove_vectors: grenad::Reader<BufReader<File>>,
// docid -> prompt
pub prompts: grenad::Reader<BufReader<File>>,
// embedder
pub embedder_name: String,
pub embedder: Arc<Embedder>,
}
enum VectorStateDelta {
@ -65,6 +69,19 @@ impl VectorStateDelta {
}
}
struct EmbedderVectorExtractor {
embedder_name: String,
embedder: Arc<Embedder>,
prompt: Arc<Prompt>,
// (docid, _index) -> KvWriterDelAdd -> Vector
manual_vectors_writer: Writer<BufWriter<File>>,
// (docid) -> (prompt)
prompts_writer: Writer<BufWriter<File>>,
// (docid) -> ()
remove_vectors_writer: Writer<BufWriter<File>>,
}
/// Extracts the embedding vector contained in each document under the `_vectors` field.
///
/// Returns the generated grenad reader containing the docid as key associated to the Vec<f32>
@ -73,34 +90,52 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
obkv_documents: grenad::Reader<R>,
indexer: GrenadParameters,
settings_diff: &InnerIndexSettingsDiff,
prompt: &Prompt,
embedder_name: &str,
) -> Result<ExtractedVectorPoints> {
puffin::profile_function!();
) -> Result<Vec<ExtractedVectorPoints>> {
let reindex_vectors = settings_diff.reindex_vectors();
let old_fields_ids_map = &settings_diff.old.fields_ids_map;
let new_fields_ids_map = &settings_diff.new.fields_ids_map;
// the vector field id may have changed
let old_vectors_fid = old_fields_ids_map.id(RESERVED_VECTORS_FIELD_NAME);
// filter the old vector fid if the settings has been changed forcing reindexing.
let old_vectors_fid = old_vectors_fid.filter(|_| !reindex_vectors);
// (docid, _index) -> KvWriterDelAdd -> Vector
let mut manual_vectors_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
let new_vectors_fid = new_fields_ids_map.id(RESERVED_VECTORS_FIELD_NAME);
// (docid) -> (prompt)
let mut prompts_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
let mut extractors = Vec::new();
for (embedder_name, (embedder, prompt)) in
settings_diff.new.embedding_configs.clone().into_iter()
{
// (docid, _index) -> KvWriterDelAdd -> Vector
let manual_vectors_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
// (docid) -> ()
let mut remove_vectors_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
// (docid) -> (prompt)
let prompts_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
// (docid) -> ()
let remove_vectors_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
extractors.push(EmbedderVectorExtractor {
embedder_name,
embedder,
prompt,
manual_vectors_writer,
prompts_writer,
remove_vectors_writer,
});
}
let mut key_buffer = Vec::new();
let mut cursor = obkv_documents.into_cursor()?;
@ -114,152 +149,138 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
key_buffer.clear();
key_buffer.extend_from_slice(docid_bytes);
// since we only needs the primary key when we throw an error we create this getter to
// since we only need the primary key when we throw an error we create this getter to
// lazily get it when needed
let document_id = || -> Value { from_utf8(external_id_bytes).unwrap().into() };
// the vector field id may have changed
let old_vectors_fid = old_fields_ids_map.id("_vectors");
// filter the old vector fid if the settings has been changed forcing reindexing.
let old_vectors_fid = old_vectors_fid.filter(|_| !settings_diff.reindex_vectors());
let mut parsed_vectors = ParsedVectorsDiff::new(obkv, old_vectors_fid, new_vectors_fid)
.map_err(|error| error.to_crate_error(document_id().to_string()))?;
let new_vectors_fid = new_fields_ids_map.id("_vectors");
let vectors_field = {
let del = old_vectors_fid
.and_then(|vectors_fid| obkv.get(vectors_fid))
.map(KvReaderDelAdd::new)
.map(|obkv| to_vector_map(obkv, DelAdd::Deletion, &document_id))
.transpose()?
.flatten();
let add = new_vectors_fid
.and_then(|vectors_fid| obkv.get(vectors_fid))
.map(KvReaderDelAdd::new)
.map(|obkv| to_vector_map(obkv, DelAdd::Addition, &document_id))
.transpose()?
.flatten();
(del, add)
};
for EmbedderVectorExtractor {
embedder_name,
embedder: _,
prompt,
manual_vectors_writer,
prompts_writer,
remove_vectors_writer,
} in extractors.iter_mut()
{
let delta = match parsed_vectors.remove(embedder_name) {
(Some(old), Some(new)) => {
// no autogeneration
let del_vectors = old.into_array_of_vectors();
let add_vectors = new.into_array_of_vectors();
let (del_map, add_map) = vectors_field;
let del_value = del_map.and_then(|mut map| map.remove(embedder_name));
let add_value = add_map.and_then(|mut map| map.remove(embedder_name));
let delta = match (del_value, add_value) {
(Some(old), Some(new)) => {
// no autogeneration
let del_vectors = extract_vectors(old, document_id, embedder_name)?;
let add_vectors = extract_vectors(new, document_id, embedder_name)?;
if add_vectors.len() > usize::from(u8::MAX) {
return Err(crate::Error::UserError(crate::UserError::TooManyVectors(
document_id().to_string(),
add_vectors.len(),
)));
}
VectorStateDelta::ManualDelta(del_vectors, add_vectors)
}
(Some(_old), None) => {
// Do we keep this document?
let document_is_kept = obkv
.iter()
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
if document_is_kept {
// becomes autogenerated
VectorStateDelta::NowGenerated(prompt.render(
obkv,
DelAdd::Addition,
new_fields_ids_map,
)?)
} else {
VectorStateDelta::NowRemoved
}
}
(None, Some(new)) => {
// was possibly autogenerated, remove all vectors for that document
let add_vectors = extract_vectors(new, document_id, embedder_name)?;
if add_vectors.len() > usize::from(u8::MAX) {
return Err(crate::Error::UserError(crate::UserError::TooManyVectors(
document_id().to_string(),
add_vectors.len(),
)));
}
VectorStateDelta::WasGeneratedNowManual(add_vectors)
}
(None, None) => {
// Do we keep this document?
let document_is_kept = obkv
.iter()
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
if document_is_kept {
// Don't give up if the old prompt was failing
let old_prompt = Some(prompt)
// TODO: this filter works because we erase the vec database when a embedding setting changes.
// When vector pipeline will be optimized, this should be removed.
.filter(|_| !settings_diff.reindex_vectors())
.map(|p| {
p.render(obkv, DelAdd::Deletion, old_fields_ids_map).unwrap_or_default()
});
let new_prompt = prompt.render(obkv, DelAdd::Addition, new_fields_ids_map)?;
if old_prompt.as_ref() != Some(&new_prompt) {
let old_prompt = old_prompt.unwrap_or_default();
tracing::trace!(
"🚀 Changing prompt from\n{old_prompt}\n===to===\n{new_prompt}"
);
VectorStateDelta::NowGenerated(new_prompt)
} else {
tracing::trace!("⏭️ Prompt unmodified, skipping");
VectorStateDelta::NoChange
if add_vectors.len() > usize::from(u8::MAX) {
return Err(crate::Error::UserError(crate::UserError::TooManyVectors(
document_id().to_string(),
add_vectors.len(),
)));
}
} else {
VectorStateDelta::NowRemoved
}
}
};
// and we finally push the unique vectors into the writer
push_vectors_diff(
&mut remove_vectors_writer,
&mut prompts_writer,
&mut manual_vectors_writer,
&mut key_buffer,
delta,
settings_diff,
)?;
VectorStateDelta::ManualDelta(del_vectors, add_vectors)
}
(Some(_old), None) => {
// Do we keep this document?
let document_is_kept = obkv
.iter()
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
if document_is_kept {
// becomes autogenerated
VectorStateDelta::NowGenerated(prompt.render(
obkv,
DelAdd::Addition,
new_fields_ids_map,
)?)
} else {
VectorStateDelta::NowRemoved
}
}
(None, Some(new)) => {
// was possibly autogenerated, remove all vectors for that document
let add_vectors = new.into_array_of_vectors();
if add_vectors.len() > usize::from(u8::MAX) {
return Err(crate::Error::UserError(crate::UserError::TooManyVectors(
document_id().to_string(),
add_vectors.len(),
)));
}
VectorStateDelta::WasGeneratedNowManual(add_vectors)
}
(None, None) => {
// Do we keep this document?
let document_is_kept = obkv
.iter()
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
if document_is_kept {
// Don't give up if the old prompt was failing
let old_prompt = Some(&prompt)
// TODO: this filter works because we erase the vec database when a embedding setting changes.
// When vector pipeline will be optimized, this should be removed.
.filter(|_| !settings_diff.reindex_vectors())
.map(|p| {
p.render(obkv, DelAdd::Deletion, old_fields_ids_map)
.unwrap_or_default()
});
let new_prompt =
prompt.render(obkv, DelAdd::Addition, new_fields_ids_map)?;
if old_prompt.as_ref() != Some(&new_prompt) {
let old_prompt = old_prompt.unwrap_or_default();
tracing::trace!(
"🚀 Changing prompt from\n{old_prompt}\n===to===\n{new_prompt}"
);
VectorStateDelta::NowGenerated(new_prompt)
} else {
tracing::trace!("⏭️ Prompt unmodified, skipping");
VectorStateDelta::NoChange
}
} else {
VectorStateDelta::NowRemoved
}
}
};
// and we finally push the unique vectors into the writer
push_vectors_diff(
remove_vectors_writer,
prompts_writer,
manual_vectors_writer,
&mut key_buffer,
delta,
reindex_vectors,
)?;
}
}
Ok(ExtractedVectorPoints {
// docid, _index -> KvWriterDelAdd -> Vector
manual_vectors: writer_into_reader(manual_vectors_writer)?,
// docid -> ()
remove_vectors: writer_into_reader(remove_vectors_writer)?,
// docid -> prompt
prompts: writer_into_reader(prompts_writer)?,
})
}
let mut results = Vec::new();
fn to_vector_map(
obkv: KvReaderDelAdd,
side: DelAdd,
document_id: &impl Fn() -> Value,
) -> Result<Option<serde_json::Map<String, Value>>> {
Ok(if let Some(value) = obkv.get(side) {
let Ok(value) = from_slice(value) else {
let value = from_slice(value).map_err(InternalError::SerdeJson)?;
return Err(crate::Error::UserError(UserError::InvalidVectorsMapType {
document_id: document_id(),
value,
}));
};
Some(value)
} else {
None
})
for EmbedderVectorExtractor {
embedder_name,
embedder,
prompt: _,
manual_vectors_writer,
prompts_writer,
remove_vectors_writer,
} in extractors
{
results.push(ExtractedVectorPoints {
// docid, _index -> KvWriterDelAdd -> Vector
manual_vectors: writer_into_reader(manual_vectors_writer)?,
// docid -> ()
remove_vectors: writer_into_reader(remove_vectors_writer)?,
// docid -> prompt
prompts: writer_into_reader(prompts_writer)?,
embedder,
embedder_name,
})
}
Ok(results)
}
/// Computes the diff between both Del and Add numbers and
@ -270,14 +291,13 @@ fn push_vectors_diff(
manual_vectors_writer: &mut Writer<BufWriter<File>>,
key_buffer: &mut Vec<u8>,
delta: VectorStateDelta,
settings_diff: &InnerIndexSettingsDiff,
reindex_vectors: bool,
) -> Result<()> {
puffin::profile_function!();
let (must_remove, prompt, (mut del_vectors, mut add_vectors)) = delta.into_values();
if must_remove
// TODO: the below condition works because we erase the vec database when a embedding setting changes.
// When vector pipeline will be optimized, this should be removed.
&& !settings_diff.reindex_vectors()
&& !reindex_vectors
{
key_buffer.truncate(TRUNCATE_SIZE);
remove_vectors_writer.insert(&key_buffer, [])?;
@ -308,7 +328,7 @@ fn push_vectors_diff(
EitherOrBoth::Left(vector) => {
// TODO: the below condition works because we erase the vec database when a embedding setting changes.
// When vector pipeline will be optimized, this should be removed.
if !settings_diff.reindex_vectors() {
if !reindex_vectors {
// We insert only the Del part of the Obkv to inform
// that we only want to remove all those vectors.
let mut obkv = KvWriterDelAdd::memory();
@ -336,26 +356,6 @@ fn compare_vectors(a: &[f32], b: &[f32]) -> Ordering {
a.iter().copied().map(OrderedFloat).cmp(b.iter().copied().map(OrderedFloat))
}
/// Extracts the vectors from a JSON value.
fn extract_vectors(
value: Value,
document_id: impl Fn() -> Value,
name: &str,
) -> Result<Vec<Vec<f32>>> {
// FIXME: ugly clone of the vectors here
match serde_json::from_value(value.clone()) {
Ok(vectors) => {
Ok(VectorOrArrayOfVectors::into_array_of_vectors(vectors).unwrap_or_default())
}
Err(_) => Err(UserError::InvalidVectorsType {
document_id: document_id(),
value,
subfield: name.to_owned(),
}
.into()),
}
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")]
pub fn extract_embeddings<R: io::Read + io::Seek>(
// docid, prompt
@ -364,7 +364,6 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
embedder: Arc<Embedder>,
request_threads: &ThreadPoolNoAbort,
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let n_chunks = embedder.chunk_count_hint(); // chunk level parallelism
let n_vectors_per_chunk = embedder.prompt_count_in_chunk_hint(); // number of vectors in a single chunk

View file

@ -36,8 +36,6 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
grenad::Reader<BufReader<File>>,
grenad::Reader<BufReader<File>>,
)> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut word_fid_docids_sorter = create_sorter(
@ -167,8 +165,6 @@ fn words_into_sorter(
add_words: &BTreeSet<Vec<u8>>,
word_fid_docids_sorter: &mut grenad::Sorter<MergeFn>,
) -> Result<()> {
puffin::profile_function!();
use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right};

View file

@ -26,7 +26,6 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters,
settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let any_deletion = settings_diff.old.proximity_precision == ProximityPrecision::ByWord;
let any_addition = settings_diff.new.proximity_precision == ProximityPrecision::ByWord;
@ -71,8 +70,6 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
// if we change document, we fill the sorter
if current_document_id.map_or(false, |id| id != document_id) {
puffin::profile_scope!("Document into sorter");
// FIXME: span inside of a hot loop might degrade performance and create big reports
let span = tracing::trace_span!(target: "indexing::details", "document_into_sorter");
let _entered = span.enter();
@ -163,7 +160,6 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
}
if let Some(document_id) = current_document_id {
puffin::profile_scope!("Final document into sorter");
// FIXME: span inside of a hot loop might degrade performance and create big reports
let span = tracing::trace_span!(target: "indexing::details", "final_document_into_sorter");
let _entered = span.enter();
@ -176,7 +172,6 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
)?;
}
{
puffin::profile_scope!("sorter_into_reader");
// FIXME: span inside of a hot loop might degrade performance and create big reports
let span = tracing::trace_span!(target: "indexing::details", "sorter_into_reader");
let _entered = span.enter();

View file

@ -25,8 +25,6 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters,
_settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut word_position_docids_sorter = create_sorter(
@ -104,8 +102,6 @@ fn words_position_into_sorter(
add_word_positions: &BTreeSet<(u16, Vec<u8>)>,
word_position_docids_sorter: &mut grenad::Sorter<MergeFn>,
) -> Result<()> {
puffin::profile_function!();
use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right};

View file

@ -46,8 +46,6 @@ pub(crate) fn data_from_obkv_documents(
settings_diff: Arc<InnerIndexSettingsDiff>,
max_positions_per_attributes: Option<u32>,
) -> Result<()> {
puffin::profile_function!();
let (original_pipeline_result, flattened_pipeline_result): (Result<_>, Result<_>) = rayon::join(
|| {
original_obkv_chunks
@ -88,7 +86,6 @@ pub(crate) fn data_from_obkv_documents(
lmdb_writer_sx.clone(),
extract_fid_word_count_docids,
TypedChunk::FieldIdWordCountDocids,
"field-id-wordcount-docids",
);
run_extraction_task::<
_,
@ -115,7 +112,6 @@ pub(crate) fn data_from_obkv_documents(
word_fid_docids_reader,
}
},
"word-docids",
);
run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>(
@ -125,7 +121,6 @@ pub(crate) fn data_from_obkv_documents(
lmdb_writer_sx.clone(),
extract_word_position_docids,
TypedChunk::WordPositionDocids,
"word-position-docids",
);
run_extraction_task::<
@ -139,7 +134,6 @@ pub(crate) fn data_from_obkv_documents(
lmdb_writer_sx.clone(),
extract_facet_string_docids,
TypedChunk::FieldIdFacetStringDocids,
"field-id-facet-string-docids",
);
run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>(
@ -149,7 +143,6 @@ pub(crate) fn data_from_obkv_documents(
lmdb_writer_sx.clone(),
extract_facet_number_docids,
TypedChunk::FieldIdFacetNumberDocids,
"field-id-facet-number-docids",
);
run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>(
@ -159,7 +152,6 @@ pub(crate) fn data_from_obkv_documents(
lmdb_writer_sx.clone(),
extract_word_pair_proximity_docids,
TypedChunk::WordPairProximityDocids,
"word-pair-proximity-docids",
);
}
@ -183,7 +175,6 @@ fn run_extraction_task<FE, FS, M>(
lmdb_writer_sx: Sender<Result<TypedChunk>>,
extract_fn: FE,
serialize_fn: FS,
name: &'static str,
) where
FE: Fn(
grenad::Reader<CursorClonableMmap>,
@ -201,7 +192,7 @@ fn run_extraction_task<FE, FS, M>(
rayon::spawn(move || {
let child_span = tracing::trace_span!(target: "indexing::extract::details", parent: &current_span, "extract_multiple_chunks");
let _entered = child_span.enter();
puffin::profile_scope!("extract_multiple_chunks", name);
match extract_fn(chunk, indexer, &settings_diff) {
Ok(chunk) => {
let _ = lmdb_writer_sx.send(Ok(serialize_fn(chunk)));
@ -224,27 +215,31 @@ fn send_original_documents_data(
let original_documents_chunk =
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
let documents_chunk_cloned = original_documents_chunk.clone();
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
let request_threads = ThreadPoolNoAbortBuilder::new()
.num_threads(crate::vector::REQUEST_PARALLELISM)
.thread_name(|index| format!("embedding-request-{index}"))
.build()?;
if settings_diff.reindex_vectors() || !settings_diff.settings_update_only() {
let index_vectors = (settings_diff.reindex_vectors() || !settings_diff.settings_update_only())
// no point in indexing vectors without embedders
&& (!settings_diff.new.embedding_configs.inner_as_ref().is_empty());
if index_vectors {
let settings_diff = settings_diff.clone();
let original_documents_chunk = original_documents_chunk.clone();
let lmdb_writer_sx = lmdb_writer_sx.clone();
rayon::spawn(move || {
for (name, (embedder, prompt)) in settings_diff.new.embedding_configs.clone() {
let result = extract_vector_points(
documents_chunk_cloned.clone(),
indexer,
&settings_diff,
&prompt,
&name,
);
match result {
Ok(ExtractedVectorPoints { manual_vectors, remove_vectors, prompts }) => {
match extract_vector_points(original_documents_chunk.clone(), indexer, &settings_diff) {
Ok(extracted_vectors) => {
for ExtractedVectorPoints {
manual_vectors,
remove_vectors,
prompts,
embedder_name,
embedder,
} in extracted_vectors
{
let embeddings = match extract_embeddings(
prompts,
indexer,
@ -253,28 +248,26 @@ fn send_original_documents_data(
) {
Ok(results) => Some(results),
Err(error) => {
let _ = lmdb_writer_sx_cloned.send(Err(error));
let _ = lmdb_writer_sx.send(Err(error));
None
}
};
if !(remove_vectors.is_empty()
&& manual_vectors.is_empty()
&& embeddings.as_ref().map_or(true, |e| e.is_empty()))
{
let _ = lmdb_writer_sx_cloned.send(Ok(TypedChunk::VectorPoints {
let _ = lmdb_writer_sx.send(Ok(TypedChunk::VectorPoints {
remove_vectors,
embeddings,
expected_dimension: embedder.dimensions(),
manual_vectors,
embedder_name: name,
embedder_name,
}));
}
}
Err(error) => {
let _ = lmdb_writer_sx_cloned.send(Err(error));
}
}
Err(error) => {
let _ = lmdb_writer_sx.send(Err(error));
}
}
});

View file

@ -61,7 +61,6 @@ pub fn sorter_into_reader(
sorter: grenad::Sorter<MergeFn>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
@ -182,8 +181,6 @@ where
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec<u8>) -> Result<Option<&'a [u8]>>,
{
puffin::profile_function!();
let mut buffer = Vec::new();
let database = database.remap_types::<Bytes, Bytes>();

View file

@ -6,6 +6,7 @@ mod typed_chunk;
use std::collections::{HashMap, HashSet};
use std::io::{Read, Seek};
use std::iter;
use std::num::NonZeroU32;
use std::result::Result as StdResult;
use std::sync::Arc;
@ -140,8 +141,6 @@ where
mut self,
reader: DocumentsBatchReader<R>,
) -> Result<(Self, StdResult<u64, UserError>)> {
puffin::profile_function!();
// Early return when there is no document to add
if reader.is_empty() {
return Ok((self, Ok(0)));
@ -186,8 +185,6 @@ where
mut self,
to_delete: Vec<String>,
) -> Result<(Self, StdResult<u64, UserError>)> {
puffin::profile_function!();
// Early return when there is no document to add
if to_delete.is_empty() {
// Maintains Invariant: remove documents actually always returns Ok for the inner result
@ -222,8 +219,6 @@ where
mut self,
to_delete: &RoaringBitmap,
) -> Result<(Self, u64)> {
puffin::profile_function!();
// Early return when there is no document to add
if to_delete.is_empty() {
return Ok((self, 0));
@ -248,8 +243,6 @@ where
name = "index_documents"
)]
pub fn execute(mut self) -> Result<DocumentAdditionResult> {
puffin::profile_function!();
if self.added_documents == 0 && self.deleted_documents == 0 {
let number_of_documents = self.index.number_of_documents(self.wtxn)?;
return Ok(DocumentAdditionResult { indexed_documents: 0, number_of_documents });
@ -278,8 +271,6 @@ where
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
puffin::profile_function!();
let TransformOutput {
primary_key,
mut settings_diff,
@ -337,7 +328,10 @@ where
let min_chunk_size = 1024 * 512; // 512KiB
// compute the chunk size from the number of available threads and the inputed data size.
let total_size = flattened_documents.metadata().map(|m| m.len());
let total_size = match flattened_documents.as_ref() {
Some(flattened_documents) => flattened_documents.metadata().map(|m| m.len()),
None => Ok(default_chunk_size as u64),
};
let current_num_threads = pool.current_num_threads();
// if we have more than 2 thread, create a number of chunk equal to 3/4 threads count
let chunk_count = if current_num_threads > 2 {
@ -351,8 +345,14 @@ where
}
};
let original_documents = grenad::Reader::new(original_documents)?;
let flattened_documents = grenad::Reader::new(flattened_documents)?;
let original_documents = match original_documents {
Some(original_documents) => Some(grenad::Reader::new(original_documents)?),
None => None,
};
let flattened_documents = match flattened_documents {
Some(flattened_documents) => Some(grenad::Reader::new(flattened_documents)?),
None => None,
};
let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes;
@ -371,15 +371,23 @@ where
pool.install(|| {
rayon::spawn(move || {
let child_span = tracing::trace_span!(target: "indexing::details", parent: &current_span, "extract_and_send_grenad_chunks");
let _enter = child_span.enter();
puffin::profile_scope!("extract_and_send_grenad_chunks");
// split obkv file into several chunks
let original_chunk_iter =
grenad_obkv_into_chunks(original_documents, pool_params, documents_chunk_size);
let _enter = child_span.enter();
// split obkv file into several chunks
let flattened_chunk_iter =
grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size);
let original_chunk_iter = match original_documents {
Some(original_documents) => {
grenad_obkv_into_chunks(original_documents,pool_params,documents_chunk_size).map(either::Left)
},
None => Ok(either::Right(iter::empty())),
};
// split obkv file into several chunks
let flattened_chunk_iter = match flattened_documents {
Some(flattened_documents) => {
grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size).map(either::Left)
},
None => Ok(either::Right(iter::empty())),
};
let result = original_chunk_iter.and_then(|original_chunk| {
let flattened_chunk = flattened_chunk_iter?;
@ -533,7 +541,7 @@ where
let writer_index = (embedder_index as u16) << 8;
for k in 0..=u8::MAX {
let writer =
arroy::Writer::new(vector_arroy, writer_index | (k as u16), dimension)?;
arroy::Writer::new(vector_arroy, writer_index | (k as u16), dimension);
if writer.is_empty(wtxn)? {
break;
}
@ -571,8 +579,6 @@ where
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
puffin::profile_function!();
// Merged databases are already been indexed, we start from this count;
let mut databases_seen = MERGED_DATABASE_COUNT;
@ -616,7 +622,6 @@ where
{
let span = tracing::trace_span!(target: "indexing::details", "compute_prefix_diffs");
let _entered = span.enter();
puffin::profile_scope!("compute_prefix_diffs");
current_prefix_fst = self.index.words_prefixes_fst(self.wtxn)?;
@ -756,8 +761,6 @@ fn execute_word_prefix_docids(
common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> {
puffin::profile_function!();
let mut builder = WordPrefixDocids::new(txn, word_docids_db, word_prefix_docids_db);
builder.chunk_compression_type = indexer_config.chunk_compression_type;
builder.chunk_compression_level = indexer_config.chunk_compression_level;
@ -3237,6 +3240,7 @@ mod tests {
}
#[test]
#[cfg(feature = "all-tokenizations")]
fn stored_detected_script_and_language_should_not_return_deleted_documents() {
use charabia::{Language, Script};
let index = TempIndex::new();

View file

@ -1,7 +1,7 @@
use std::borrow::Cow;
use std::collections::btree_map::Entry as BEntry;
use std::collections::hash_map::Entry as HEntry;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::{Read, Seek};
@ -20,21 +20,21 @@ use super::{IndexDocumentsMethod, IndexerConfig};
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
use crate::error::{Error, InternalError, UserError};
use crate::index::{db_name, main_key};
use crate::update::del_add::{
del_add_from_two_obkvs, into_del_add_obkv, DelAdd, DelAddOperation, KvReaderDelAdd,
};
use crate::update::del_add::{into_del_add_obkv, DelAdd, DelAddOperation, KvReaderDelAdd};
use crate::update::index_documents::GrenadParameters;
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
use crate::update::{AvailableDocumentsIds, UpdateIndexingStep};
use crate::{FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result};
use crate::{
is_faceted_by, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result,
};
pub struct TransformOutput {
pub primary_key: String,
pub settings_diff: InnerIndexSettingsDiff,
pub field_distribution: FieldDistribution,
pub documents_count: usize,
pub original_documents: File,
pub flattened_documents: File,
pub original_documents: Option<File>,
pub flattened_documents: Option<File>,
}
/// Extract the external ids, deduplicate and compute the new internal documents ids
@ -161,8 +161,6 @@ impl<'a, 'i> Transform<'a, 'i> {
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
puffin::profile_function!();
let (mut cursor, fields_index) = reader.into_cursor_and_fields_index();
let external_documents_ids = self.index.external_documents_ids();
let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?;
@ -375,8 +373,6 @@ impl<'a, 'i> Transform<'a, 'i> {
where
FA: Fn() -> bool + Sync,
{
puffin::profile_function!();
// there may be duplicates in the documents to remove.
to_remove.sort_unstable();
to_remove.dedup();
@ -466,8 +462,6 @@ impl<'a, 'i> Transform<'a, 'i> {
where
FA: Fn() -> bool + Sync,
{
puffin::profile_function!();
let mut documents_deleted = 0;
let mut document_sorter_value_buffer = Vec::new();
let mut document_sorter_key_buffer = Vec::new();
@ -686,8 +680,6 @@ impl<'a, 'i> Transform<'a, 'i> {
where
F: Fn(UpdateIndexingStep) + Sync,
{
puffin::profile_function!();
let primary_key = self
.index
.primary_key(wtxn)?
@ -808,11 +800,15 @@ impl<'a, 'i> Transform<'a, 'i> {
})?;
let old_inner_settings = InnerIndexSettings::from_index(self.index, wtxn)?;
let fields_ids_map = self.fields_ids_map;
let primary_key_id = self.index.primary_key(wtxn)?.and_then(|name| fields_ids_map.id(name));
let mut new_inner_settings = old_inner_settings.clone();
new_inner_settings.fields_ids_map = self.fields_ids_map;
new_inner_settings.fields_ids_map = fields_ids_map;
let settings_diff = InnerIndexSettingsDiff {
old: old_inner_settings,
new: new_inner_settings,
primary_key_id,
embedding_configs_updated: false,
settings_update_only: false,
};
@ -822,10 +818,12 @@ impl<'a, 'i> Transform<'a, 'i> {
settings_diff,
field_distribution,
documents_count: self.documents_count,
original_documents: original_documents.into_inner().map_err(|err| err.into_error())?,
flattened_documents: flattened_documents
.into_inner()
.map_err(|err| err.into_error())?,
original_documents: Some(
original_documents.into_inner().map_err(|err| err.into_error())?,
),
flattened_documents: Some(
flattened_documents.into_inner().map_err(|err| err.into_error())?,
),
})
}
@ -835,34 +833,66 @@ impl<'a, 'i> Transform<'a, 'i> {
fn rebind_existing_document(
old_obkv: KvReader<FieldId>,
settings_diff: &InnerIndexSettingsDiff,
original_obkv_buffer: &mut Vec<u8>,
flattened_obkv_buffer: &mut Vec<u8>,
modified_faceted_fields: &HashSet<String>,
original_obkv_buffer: Option<&mut Vec<u8>>,
flattened_obkv_buffer: Option<&mut Vec<u8>>,
) -> Result<()> {
let mut old_fields_ids_map = settings_diff.old.fields_ids_map.clone();
let mut new_fields_ids_map = settings_diff.new.fields_ids_map.clone();
// Always keep the primary key.
let is_primary_key = |id: FieldId| -> bool { settings_diff.primary_key_id == Some(id) };
// If only the `searchableAttributes` has been changed, keep only the searchable fields.
let must_reindex_searchables = settings_diff.reindex_searchable();
let necessary_searchable_field = |id: FieldId| -> bool {
must_reindex_searchables
&& (settings_diff.old.searchable_fields_ids.contains(&id)
|| settings_diff.new.searchable_fields_ids.contains(&id))
};
// If only a faceted field has been added, keep only this field.
let must_reindex_facets = settings_diff.reindex_facets();
let necessary_faceted_field = |id: FieldId| -> bool {
let field_name = settings_diff.new.fields_ids_map.name(id).unwrap();
must_reindex_facets
&& modified_faceted_fields
.iter()
.any(|long| is_faceted_by(long, field_name) || is_faceted_by(field_name, long))
};
// Alway provide all fields when vectors are involved because
// we need the fields for the prompt/templating.
let reindex_vectors = settings_diff.reindex_vectors();
let mut obkv_writer = KvWriter::<_, FieldId>::memory();
// We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv.
for (id, name) in new_fields_ids_map.iter() {
if let Some(val) = old_fields_ids_map.id(name).and_then(|id| old_obkv.get(id)) {
for (id, val) in old_obkv.iter() {
if is_primary_key(id)
|| necessary_searchable_field(id)
|| necessary_faceted_field(id)
|| reindex_vectors
{
obkv_writer.insert(id, val)?;
}
}
let data = obkv_writer.into_inner()?;
let new_obkv = KvReader::<FieldId>::new(&data);
let obkv = KvReader::<FieldId>::new(&data);
// take the non-flattened version if flatten_from_fields_ids_map returns None.
let old_flattened = Self::flatten_from_fields_ids_map(&old_obkv, &mut old_fields_ids_map)?;
let old_flattened =
old_flattened.as_deref().map_or_else(|| old_obkv, KvReader::<FieldId>::new);
let new_flattened = Self::flatten_from_fields_ids_map(&new_obkv, &mut new_fields_ids_map)?;
let new_flattened =
new_flattened.as_deref().map_or_else(|| new_obkv, KvReader::<FieldId>::new);
if let Some(original_obkv_buffer) = original_obkv_buffer {
original_obkv_buffer.clear();
into_del_add_obkv(obkv, DelAddOperation::DeletionAndAddition, original_obkv_buffer)?;
}
original_obkv_buffer.clear();
flattened_obkv_buffer.clear();
if let Some(flattened_obkv_buffer) = flattened_obkv_buffer {
// take the non-flattened version if flatten_from_fields_ids_map returns None.
let mut fields_ids_map = settings_diff.new.fields_ids_map.clone();
let flattened = Self::flatten_from_fields_ids_map(&obkv, &mut fields_ids_map)?;
let flattened = flattened.as_deref().map_or(obkv, KvReader::new);
del_add_from_two_obkvs(&old_obkv, &new_obkv, original_obkv_buffer)?;
del_add_from_two_obkvs(&old_flattened, &new_flattened, flattened_obkv_buffer)?;
flattened_obkv_buffer.clear();
into_del_add_obkv(
flattened,
DelAddOperation::DeletionAndAddition,
flattened_obkv_buffer,
)?;
}
Ok(())
}
@ -891,46 +921,63 @@ impl<'a, 'i> Transform<'a, 'i> {
let documents_count = documents_ids.len() as usize;
// We initialize the sorter with the user indexing settings.
let mut original_sorter = create_sorter(
grenad::SortAlgorithm::Stable,
keep_first,
self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level,
self.indexer_settings.max_nb_chunks,
self.indexer_settings.max_memory.map(|mem| mem / 2),
);
let mut original_sorter = if settings_diff.reindex_vectors() {
Some(create_sorter(
grenad::SortAlgorithm::Stable,
keep_first,
self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level,
self.indexer_settings.max_nb_chunks,
self.indexer_settings.max_memory.map(|mem| mem / 2),
))
} else {
None
};
// We initialize the sorter with the user indexing settings.
let mut flattened_sorter = create_sorter(
grenad::SortAlgorithm::Stable,
keep_first,
self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level,
self.indexer_settings.max_nb_chunks,
self.indexer_settings.max_memory.map(|mem| mem / 2),
);
let mut flattened_sorter =
if settings_diff.reindex_searchable() || settings_diff.reindex_facets() {
Some(create_sorter(
grenad::SortAlgorithm::Stable,
keep_first,
self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level,
self.indexer_settings.max_nb_chunks,
self.indexer_settings.max_memory.map(|mem| mem / 2),
))
} else {
None
};
let mut original_obkv_buffer = Vec::new();
let mut flattened_obkv_buffer = Vec::new();
let mut document_sorter_key_buffer = Vec::new();
for result in self.index.external_documents_ids().iter(wtxn)? {
let (external_id, docid) = result?;
let old_obkv = self.index.documents.get(wtxn, &docid)?.ok_or(
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
)?;
if original_sorter.is_some() || flattened_sorter.is_some() {
let modified_faceted_fields = settings_diff.modified_faceted_fields();
let mut original_obkv_buffer = Vec::new();
let mut flattened_obkv_buffer = Vec::new();
let mut document_sorter_key_buffer = Vec::new();
for result in self.index.external_documents_ids().iter(wtxn)? {
let (external_id, docid) = result?;
let old_obkv = self.index.documents.get(wtxn, &docid)?.ok_or(
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
)?;
Self::rebind_existing_document(
old_obkv,
&settings_diff,
&mut original_obkv_buffer,
&mut flattened_obkv_buffer,
)?;
Self::rebind_existing_document(
old_obkv,
&settings_diff,
&modified_faceted_fields,
Some(&mut original_obkv_buffer).filter(|_| original_sorter.is_some()),
Some(&mut flattened_obkv_buffer).filter(|_| flattened_sorter.is_some()),
)?;
document_sorter_key_buffer.clear();
document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
original_sorter.insert(&document_sorter_key_buffer, &original_obkv_buffer)?;
flattened_sorter.insert(docid.to_be_bytes(), &flattened_obkv_buffer)?;
if let Some(original_sorter) = original_sorter.as_mut() {
document_sorter_key_buffer.clear();
document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
original_sorter.insert(&document_sorter_key_buffer, &original_obkv_buffer)?;
}
if let Some(flattened_sorter) = flattened_sorter.as_mut() {
flattened_sorter.insert(docid.to_be_bytes(), &flattened_obkv_buffer)?;
}
}
}
let grenad_params = GrenadParameters {
@ -941,17 +988,22 @@ impl<'a, 'i> Transform<'a, 'i> {
};
// Once we have written all the documents, we merge everything into a Reader.
let original_documents = sorter_into_reader(original_sorter, grenad_params)?;
let flattened_documents = sorter_into_reader(flattened_sorter, grenad_params)?;
let flattened_documents = match flattened_sorter {
Some(flattened_sorter) => Some(sorter_into_reader(flattened_sorter, grenad_params)?),
None => None,
};
let original_documents = match original_sorter {
Some(original_sorter) => Some(sorter_into_reader(original_sorter, grenad_params)?),
None => None,
};
Ok(TransformOutput {
primary_key,
field_distribution,
settings_diff,
documents_count,
original_documents: original_documents.into_inner().into_inner(),
flattened_documents: flattened_documents.into_inner().into_inner(),
original_documents: original_documents.map(|od| od.into_inner().into_inner()),
flattened_documents: flattened_documents.map(|fd| fd.into_inner().into_inner()),
})
}
}

View file

@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{BTreeSet, HashMap};
use std::convert::TryInto;
use std::fs::File;
use std::io::{self, BufReader};
@ -118,65 +118,6 @@ impl TypedChunk {
}
}
impl TypedChunk {
pub fn to_debug_string(&self) -> String {
match self {
TypedChunk::FieldIdDocidFacetStrings(grenad) => {
format!("FieldIdDocidFacetStrings {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdDocidFacetNumbers(grenad) => {
format!("FieldIdDocidFacetNumbers {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::Documents(grenad) => {
format!("Documents {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdWordCountDocids(grenad) => {
format!("FieldIdWordcountDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::WordDocids {
word_docids_reader,
exact_word_docids_reader,
word_fid_docids_reader,
} => format!(
"WordDocids {{ word_docids_reader: {}, exact_word_docids_reader: {}, word_fid_docids_reader: {} }}",
word_docids_reader.len(),
exact_word_docids_reader.len(),
word_fid_docids_reader.len()
),
TypedChunk::WordPositionDocids(grenad) => {
format!("WordPositionDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::WordPairProximityDocids(grenad) => {
format!("WordPairProximityDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetStringDocids((grenad, _)) => {
format!("FieldIdFacetStringDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetNumberDocids(grenad) => {
format!("FieldIdFacetNumberDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetExistsDocids(grenad) => {
format!("FieldIdFacetExistsDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetIsNullDocids(grenad) => {
format!("FieldIdFacetIsNullDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdFacetIsEmptyDocids(grenad) => {
format!("FieldIdFacetIsEmptyDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::GeoPoints(grenad) => {
format!("GeoPoints {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::VectorPoints{ remove_vectors, manual_vectors, embeddings, expected_dimension, embedder_name } => {
format!("VectorPoints {{ remove_vectors: {}, manual_vectors: {}, embeddings: {}, dimension: {}, embedder_name: {} }}", remove_vectors.len(), manual_vectors.len(), embeddings.as_ref().map(|e| e.len()).unwrap_or_default(), expected_dimension, embedder_name)
}
TypedChunk::ScriptLanguageDocids(sl_map) => {
format!("ScriptLanguageDocids {{ number_of_entries: {} }}", sl_map.len())
}
}
}
}
/// Write typed chunk in the corresponding LMDB database of the provided index.
/// Return new documents seen.
#[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")]
@ -185,14 +126,16 @@ pub(crate) fn write_typed_chunk_into_index(
index: &Index,
wtxn: &mut RwTxn,
) -> Result<(RoaringBitmap, bool)> {
puffin::profile_function!(typed_chunks[0].to_debug_string());
let mut is_merged_database = false;
match typed_chunks[0] {
TypedChunk::Documents(_) => {
let span = tracing::trace_span!(target: "indexing::write_db", "documents");
let _entered = span.enter();
let fields_ids_map = index.fields_ids_map(wtxn)?;
let vectors_fid =
fields_ids_map.id(crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME);
let mut builder = MergerBuilder::new(keep_latest_obkv as MergeFn);
for typed_chunk in typed_chunks {
let TypedChunk::Documents(chunk) = typed_chunk else {
@ -206,6 +149,10 @@ pub(crate) fn write_typed_chunk_into_index(
let mut docids = index.documents_ids(wtxn)?;
let mut iter = merger.into_stream_merger_iter()?;
let embedders: BTreeSet<_> =
index.embedding_configs(wtxn)?.into_iter().map(|(k, _v)| k).collect();
let mut vectors_buffer = Vec::new();
while let Some((key, reader)) = iter.next()? {
let mut writer: KvWriter<_, FieldId> = KvWriter::memory();
let reader: KvReader<FieldId> = KvReader::new(reader);
@ -219,7 +166,35 @@ pub(crate) fn write_typed_chunk_into_index(
let del_add_reader = KvReaderDelAdd::new(value);
if let Some(addition) = del_add_reader.get(DelAdd::Addition) {
writer.insert(field_id, addition)?;
let addition = if vectors_fid == Some(field_id) {
'vectors: {
vectors_buffer.clear();
let Ok(mut vectors) =
crate::vector::parsed_vectors::ParsedVectors::from_bytes(
addition,
)
else {
// if the `_vectors` field cannot be parsed as map of vectors, just write it as-is
break 'vectors Some(addition);
};
vectors.retain_user_provided_vectors(&embedders);
let crate::vector::parsed_vectors::ParsedVectors(vectors) = vectors;
if vectors.is_empty() {
// skip writing empty `_vectors` map
break 'vectors None;
}
serde_json::to_writer(&mut vectors_buffer, &vectors)
.map_err(InternalError::SerdeJson)?;
Some(vectors_buffer.as_slice())
}
} else {
Some(addition)
};
if let Some(addition) = addition {
writer.insert(field_id, addition)?;
}
}
}
@ -661,7 +636,7 @@ pub(crate) fn write_typed_chunk_into_index(
)?;
let writer_index = (embedder_index as u16) << 8;
// FIXME: allow customizing distance
let writers: std::result::Result<Vec<_>, _> = (0..=u8::MAX)
let writers: Vec<_> = (0..=u8::MAX)
.map(|k| {
arroy::Writer::new(
index.vector_arroy,
@ -670,7 +645,6 @@ pub(crate) fn write_typed_chunk_into_index(
)
})
.collect();
let writers = writers?;
// remove vectors for docids we want them removed
let merger = remove_vectors_builder.build();
@ -842,7 +816,6 @@ where
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec<u8>) -> Result<Option<&'a [u8]>>,
{
puffin::profile_function!();
let mut buffer = Vec::new();
let database = database.remap_types::<Bytes, Bytes>();

View file

@ -398,8 +398,6 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
puffin::profile_function!();
// if the settings are set before any document update, we don't need to do anything, and
// will set the primary key during the first document addition.
if self.index.number_of_documents(self.wtxn)? == 0 {
@ -461,50 +459,39 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
Ok(true)
}
/// Updates the index's searchable attributes. This causes the field map to be recomputed to
/// reflect the order of the searchable attributes.
/// Updates the index's searchable attributes.
fn update_searchable(&mut self) -> Result<bool> {
match self.searchable_fields {
Setting::Set(ref fields) => {
// Check to see if the searchable fields changed before doing anything else
let old_fields = self.index.searchable_fields(self.wtxn)?;
let did_change = match old_fields {
// If old_fields is Some, let's check to see if the fields actually changed
Some(old_fields) => {
let new_fields = fields.iter().map(String::as_str).collect::<Vec<_>>();
new_fields != old_fields
}
// If old_fields is None, the fields have changed (because they are being set)
None => true,
let did_change = {
let new_fields = fields.iter().map(String::as_str).collect::<Vec<_>>();
new_fields != old_fields
};
if !did_change {
return Ok(false);
}
// every time the searchable attributes are updated, we need to update the
// ids for any settings that uses the facets. (distinct_fields, filterable_fields).
let old_fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
let mut new_fields_ids_map = FieldsIdsMap::new();
// Since we're updating the settings we can only add new fields at the end of the field id map
let mut fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
// fields are deduplicated, only the first occurrence is taken into account
let names = fields.iter().unique().map(String::as_str).collect::<Vec<_>>();
// Add all the searchable attributes to the field map, and then add the
// remaining fields from the old field map to the new one
for name in names.iter() {
new_fields_ids_map.insert(name).ok_or(UserError::AttributeLimitReached)?;
}
for (_, name) in old_fields_ids_map.iter() {
new_fields_ids_map.insert(name).ok_or(UserError::AttributeLimitReached)?;
// The fields ids map won't change the field id of already present elements thus only the
// new fields will be inserted.
fields_ids_map.insert(name).ok_or(UserError::AttributeLimitReached)?;
}
self.index.put_all_searchable_fields_from_fields_ids_map(
self.wtxn,
&names,
&new_fields_ids_map,
&fields_ids_map,
)?;
self.index.put_fields_ids_map(self.wtxn, &new_fields_ids_map)?;
self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?;
Ok(true)
}
Setting::Reset => Ok(self.index.delete_all_searchable_fields(self.wtxn)?),
@ -1078,10 +1065,17 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
// 3. Keep the old vectors but reattempt indexing on a prompt change: only actually changed prompt will need embedding + storage
let embedding_configs_updated = self.update_embedding_configs()?;
let new_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn)?;
let mut new_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn)?;
new_inner_settings.recompute_facets(self.wtxn, self.index)?;
let primary_key_id = self
.index
.primary_key(self.wtxn)?
.and_then(|name| new_inner_settings.fields_ids_map.id(name));
let inner_settings_diff = InnerIndexSettingsDiff {
old: old_inner_settings,
new: new_inner_settings,
primary_key_id,
embedding_configs_updated,
settings_update_only: true,
};
@ -1097,10 +1091,9 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
pub struct InnerIndexSettingsDiff {
pub(crate) old: InnerIndexSettings,
pub(crate) new: InnerIndexSettings,
pub(crate) primary_key_id: Option<FieldId>,
// TODO: compare directly the embedders.
pub(crate) embedding_configs_updated: bool,
pub(crate) settings_update_only: bool,
}
@ -1110,13 +1103,8 @@ impl InnerIndexSettingsDiff {
}
pub fn reindex_searchable(&self) -> bool {
self.old
.fields_ids_map
.iter()
.zip(self.new.fields_ids_map.iter())
.any(|(old, new)| old != new)
|| self.old.stop_words.as_ref().map(|set| set.as_fst().as_bytes())
!= self.new.stop_words.as_ref().map(|set| set.as_fst().as_bytes())
self.old.stop_words.as_ref().map(|set| set.as_fst().as_bytes())
!= self.new.stop_words.as_ref().map(|set| set.as_fst().as_bytes())
|| self.old.allowed_separators != self.new.allowed_separators
|| self.old.dictionary != self.new.dictionary
|| self.old.user_defined_searchable_fields != self.new.user_defined_searchable_fields
@ -1143,15 +1131,7 @@ impl InnerIndexSettingsDiff {
return true;
}
let faceted_updated =
(existing_fields - old_faceted_fields) != (existing_fields - new_faceted_fields);
self.old
.fields_ids_map
.iter()
.zip(self.new.fields_ids_map.iter())
.any(|(old, new)| old != new)
|| faceted_updated
(existing_fields - old_faceted_fields) != (existing_fields - new_faceted_fields)
}
pub fn reindex_vectors(&self) -> bool {
@ -1181,7 +1161,7 @@ pub(crate) struct InnerIndexSettings {
pub user_defined_faceted_fields: HashSet<String>,
pub user_defined_searchable_fields: Option<Vec<String>>,
pub faceted_fields_ids: HashSet<FieldId>,
pub searchable_fields_ids: Option<Vec<FieldId>>,
pub searchable_fields_ids: Vec<FieldId>,
pub exact_attributes: HashSet<FieldId>,
pub proximity_precision: ProximityPrecision,
pub embedding_configs: EmbeddingConfigs,
@ -1262,18 +1242,21 @@ impl InnerIndexSettings {
// find and insert the new field ids
pub fn recompute_searchables(&mut self, wtxn: &mut heed::RwTxn, index: &Index) -> Result<()> {
let searchable_fields = self
.user_defined_searchable_fields
.as_ref()
.map(|searchable| searchable.iter().map(|s| s.as_str()).collect::<Vec<_>>());
// in case new fields were introduced we're going to recreate the searchable fields.
if let Some(searchable_fields) = self.user_defined_searchable_fields.as_ref() {
let searchable_fields =
searchable_fields.iter().map(String::as_ref).collect::<Vec<_>>();
if let Some(searchable_fields) = searchable_fields {
index.put_all_searchable_fields_from_fields_ids_map(
wtxn,
&searchable_fields,
&self.fields_ids_map,
)?;
let searchable_fields_ids = index.searchable_fields_ids(wtxn)?;
self.searchable_fields_ids = searchable_fields_ids;
}
let searchable_fields_ids = index.searchable_fields_ids(wtxn)?;
self.searchable_fields_ids = searchable_fields_ids;
Ok(())
}
@ -1546,12 +1529,13 @@ mod tests {
use big_s::S;
use heed::types::Bytes;
use maplit::{btreemap, btreeset, hashset};
use meili_snap::snapshot;
use super::*;
use crate::error::Error;
use crate::index::tests::TempIndex;
use crate::update::ClearDocuments;
use crate::{Criterion, Filter, SearchResult};
use crate::{db_snap, Criterion, Filter, SearchResult};
#[test]
fn set_and_reset_searchable_fields() {
@ -1580,6 +1564,17 @@ mod tests {
wtxn.commit().unwrap();
db_snap!(index, fields_ids_map, @r###"
0 id |
1 name |
2 age |
"###);
db_snap!(index, searchable_fields, @r###"["name"]"###);
db_snap!(index, fieldids_weights_map, @r###"
fid weight
1 0 |
"###);
// Check that the searchable field is correctly set to "name" only.
let rtxn = index.read_txn().unwrap();
// When we search for something that is not in
@ -1591,8 +1586,9 @@ mod tests {
// we must find the appropriate document.
let result = index.search(&rtxn).query(r#""kevin""#).execute().unwrap();
let documents = index.documents(&rtxn, result.documents_ids).unwrap();
let fid_map = index.fields_ids_map(&rtxn).unwrap();
assert_eq!(documents.len(), 1);
assert_eq!(documents[0].1.get(0), Some(&br#""kevin""#[..]));
assert_eq!(documents[0].1.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..]));
drop(rtxn);
// We change the searchable fields to be the "name" field only.
@ -1602,14 +1598,31 @@ mod tests {
})
.unwrap();
db_snap!(index, fields_ids_map, @r###"
0 id |
1 name |
2 age |
"###);
db_snap!(index, searchable_fields, @r###"["id", "name", "age"]"###);
db_snap!(index, fieldids_weights_map, @r###"
fid weight
0 0 |
1 0 |
2 0 |
"###);
// Check that the searchable field have been reset and documents are found now.
let rtxn = index.read_txn().unwrap();
let fid_map = index.fields_ids_map(&rtxn).unwrap();
let user_defined_searchable_fields = index.user_defined_searchable_fields(&rtxn).unwrap();
snapshot!(format!("{user_defined_searchable_fields:?}"), @"None");
// the searchable fields should contain all the fields
let searchable_fields = index.searchable_fields(&rtxn).unwrap();
assert_eq!(searchable_fields, None);
snapshot!(format!("{searchable_fields:?}"), @r###"["id", "name", "age"]"###);
let result = index.search(&rtxn).query("23").execute().unwrap();
assert_eq!(result.documents_ids.len(), 1);
let documents = index.documents(&rtxn, result.documents_ids).unwrap();
assert_eq!(documents[0].1.get(0), Some(&br#""kevin""#[..]));
assert_eq!(documents[0].1.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..]));
}
#[test]

View file

@ -52,8 +52,6 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> {
puffin::profile_function!();
// It is forbidden to keep a mutable reference into the database
// and write into it at the same time, therefore we write into another file.
let mut prefix_docids_sorter = create_sorter(

View file

@ -57,7 +57,6 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> {
puffin::profile_function!();
debug!("Computing and writing the word levels integers docids into LMDB on disk...");
let mut prefix_integer_docids_sorter = create_sorter(

View file

@ -45,8 +45,6 @@ impl<'t, 'i> WordsPrefixesFst<'t, 'i> {
name = "words_prefix_fst"
)]
pub fn execute(self) -> Result<()> {
puffin::profile_function!();
let words_fst = self.index.words_fst(self.wtxn)?;
let mut current_prefix = vec![SmallString32::new(); self.max_prefix_length];