Find a temporary solution to par into iter on an HashMap

Spoiler: Do not use an HashMap but drain it into a Vec
This commit is contained in:
Clément Renault 2024-09-02 19:39:48 +02:00
parent 9b7858fb90
commit bcb1aa3d22
No known key found for this signature in database
GPG key ID: F250A4C4E3AE5F5F
12 changed files with 254 additions and 152 deletions

View file

@ -22,19 +22,21 @@ use std::ffi::OsStr;
use std::fmt;
use std::fs::{self, File};
use std::io::BufWriter;
use std::sync::RwLock;
use dump::IndexMetadata;
use meilisearch_types::error::Code;
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey};
use meilisearch_types::milli::heed::CompactionOption;
use meilisearch_types::milli::update::new::indexer::{self, guess_primary_key, DocumentChanges};
use meilisearch_types::milli::update::{
IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings,
self, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings,
};
use meilisearch_types::milli::vector::parsed_vectors::{
ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
};
use meilisearch_types::milli::{self, Filter, Object};
use meilisearch_types::milli::{self, Filter, Object, UserError};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
@ -1284,58 +1286,72 @@ impl IndexScheduler {
let must_stop_processing = self.must_stop_processing.clone();
let indexer_config = self.index_mapper.indexer_config();
if let Some(primary_key) = primary_key {
match index.primary_key(index_wtxn)? {
// if a primary key was set AND had already been defined in the index
// but to a different value, we can make the whole batch fail.
Some(pk) => {
if primary_key != pk {
return Err(milli::Error::from(
milli::UserError::PrimaryKeyCannotBeChanged(pk.to_string()),
)
.into());
}
}
// if the primary key was set and there was no primary key set for this index
// we set it to the received value before starting the indexing process.
None => {
let mut builder =
milli::update::Settings::new(index_wtxn, index, indexer_config);
builder.set_primary_key(primary_key);
builder.execute(
|indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.clone().get(),
)?;
primary_key_has_been_set = true;
}
}
}
/// TODO manage errors correctly
let rtxn = index.read_txn()?;
let first_addition_uuid = operations
.iter()
.find_map(|op| match op {
DocumentOperation::Add(content_uuid) => Some(content_uuid),
_ => None,
})
.unwrap();
let content_file = self.file_store.get_update(*first_addition_uuid)?;
let reader =
DocumentsBatchReader::from_reader(content_file).map_err(milli::Error::from)?;
let (cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
let primary_key =
guess_primary_key(&rtxn, index, cursor, &documents_batch_index)?.unwrap();
let config = IndexDocumentsConfig { update_method: method, ..Default::default() };
// if let Some(primary_key) = primary_key {
// match index.primary_key(index_wtxn)? {
// // if a primary key was set AND had already been defined in the index
// // but to a different value, we can make the whole batch fail.
// Some(pk) => {
// if primary_key != pk {
// return Err(milli::Error::from(
// milli::UserError::PrimaryKeyCannotBeChanged(pk.to_string()),
// )
// .into());
// }
// }
// // if the primary key was set and there was no primary key set for this index
// // we set it to the received value before starting the indexing process.
// None => {
// todo!();
// let mut builder =
// milli::update::Settings::new(index_wtxn, index, indexer_config);
// builder.set_primary_key(primary_key);
// builder.execute(
// |indexing_step| tracing::debug!(update = ?indexing_step),
// || must_stop_processing.clone().get(),
// )?;
// primary_key_has_been_set = true;
// }
// }
// }
let embedder_configs = index.embedding_configs(index_wtxn)?;
// TODO: consider Arc'ing the map too (we only need read access + we'll be cloning it multiple times, so really makes sense)
let embedders = self.embedders(embedder_configs)?;
// let config = IndexDocumentsConfig { update_method: method, ..Default::default() };
let mut builder = milli::update::IndexDocuments::new(
index_wtxn,
index,
indexer_config,
config,
|indexing_step| tracing::trace!(?indexing_step, "Update"),
|| must_stop_processing.get(),
)?;
// let embedder_configs = index.embedding_configs(index_wtxn)?;
// // TODO: consider Arc'ing the map too (we only need read access + we'll be cloning it multiple times, so really makes sense)
// let embedders = self.embedders(embedder_configs)?;
// let mut builder = milli::update::IndexDocuments::new(
// index_wtxn,
// index,
// indexer_config,
// config,
// |indexing_step| tracing::trace!(?indexing_step, "Update"),
// || must_stop_processing.get(),
// )?;
let mut indexer = indexer::DocumentOperation::new(method);
for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) {
match operation {
DocumentOperation::Add(content_uuid) => {
let content_file = self.file_store.get_update(content_uuid)?;
let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(milli::Error::from)?;
let (new_builder, user_result) = builder.add_documents(reader)?;
builder = new_builder;
builder = builder.with_embedders(embedders.clone());
let stats = indexer.add_documents(content_file)?;
// builder = builder.with_embedders(embedders.clone());
let received_documents =
if let Some(Details::DocumentAdditionOrUpdate {
@ -1349,30 +1365,17 @@ impl IndexScheduler {
unreachable!();
};
match user_result {
Ok(count) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentAdditionOrUpdate {
received_documents,
indexed_documents: Some(count),
})
}
Err(e) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentAdditionOrUpdate {
received_documents,
indexed_documents: Some(0),
});
task.error = Some(milli::Error::from(e).into());
}
}
task.status = Status::Succeeded;
task.details = Some(Details::DocumentAdditionOrUpdate {
received_documents,
indexed_documents: Some(stats.document_count as u64),
})
}
DocumentOperation::Delete(document_ids) => {
let (new_builder, user_result) =
builder.remove_documents(document_ids)?;
builder = new_builder;
let count = document_ids.len();
indexer.delete_documents(document_ids);
// Uses Invariant: remove documents actually always returns Ok for the inner result
let count = user_result.unwrap();
// let count = user_result.unwrap();
let provided_ids =
if let Some(Details::DocumentDeletion { provided_ids, .. }) =
task.details
@ -1386,15 +1389,26 @@ impl IndexScheduler {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentDeletion {
provided_ids,
deleted_documents: Some(count),
deleted_documents: Some(count as u64),
});
}
}
}
if !tasks.iter().all(|res| res.error.is_some()) {
let addition = builder.execute()?;
tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
let mut fields_ids_map = index.fields_ids_map(&rtxn)?;
/// TODO create a pool if needed
// let pool = indexer_config.thread_pool.unwrap();
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
// let fields_ids_map = RwLock::new(fields_ids_map);
let param = (index, &rtxn, &mut fields_ids_map, &primary_key);
let document_changes = indexer.document_changes(param)?;
indexer::index(index_wtxn, index, &pool, document_changes)?;
/// TODO we must store it or not?
let fields_ids_map = fields_ids_map;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
} else if primary_key_has_been_set {
// Everything failed but we've set a primary key.
// We need to remove it.