mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-03 03:47:02 +02:00
fix tests
# Conflicts: # crates/index-scheduler/src/batch.rs # crates/index-scheduler/src/snapshots/lib.rs/fail_in_process_batch_for_document_deletion/after_removing_the_documents.snap # crates/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_bad_primary_key/fifth_task_succeeds.snap # crates/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_bad_primary_key/fourth_task_fails.snap # crates/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_multiple_primary_key/second_task_fails.snap # crates/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_multiple_primary_key/third_task_fails.snap # crates/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_multiple_primary_key_batch_wrong_key/second_and_third_tasks_fails.snap # crates/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key_inference_works/all_other_tasks_succeeds.snap # crates/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key_inference_works/second_task_fails.snap # crates/index-scheduler/src/snapshots/lib.rs/test_document_addition_with_set_and_null_primary_key_inference_works/third_task_succeeds.snap # Conflicts: # crates/index-scheduler/src/batch.rs # crates/meilisearch/src/search/mod.rs # crates/meilisearch/tests/vector/mod.rs # Conflicts: # crates/index-scheduler/src/batch.rs
This commit is contained in:
parent
95ed079761
commit
b75f1f4c17
24 changed files with 378 additions and 281 deletions
|
@ -29,7 +29,6 @@ use bumpalo::collections::CollectIn;
|
|||
use bumpalo::Bump;
|
||||
use dump::IndexMetadata;
|
||||
use meilisearch_types::batches::BatchId;
|
||||
use meilisearch_types::error::Code;
|
||||
use meilisearch_types::heed::{RoTxn, RwTxn};
|
||||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey};
|
||||
use meilisearch_types::milli::heed::CompactionOption;
|
||||
|
@ -689,7 +688,9 @@ impl IndexScheduler {
|
|||
let index = self.index_mapper.index(&rtxn, name)?;
|
||||
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
|
||||
fs::create_dir_all(&dst)?;
|
||||
index.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
|
||||
index
|
||||
.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)
|
||||
.map_err(|e| Error::from_milli(e, Some(name.to_string())))?;
|
||||
}
|
||||
|
||||
drop(rtxn);
|
||||
|
@ -791,16 +792,19 @@ impl IndexScheduler {
|
|||
let content_file = self.file_store.get_update(content_file)?;
|
||||
|
||||
let reader = DocumentsBatchReader::from_reader(content_file)
|
||||
.map_err(milli::Error::from)?;
|
||||
.map_err(|e| Error::from_milli(e.into(), None))?;
|
||||
|
||||
let (mut cursor, documents_batch_index) =
|
||||
reader.into_cursor_and_fields_index();
|
||||
|
||||
while let Some(doc) =
|
||||
cursor.next_document().map_err(milli::Error::from)?
|
||||
while let Some(doc) = cursor
|
||||
.next_document()
|
||||
.map_err(|e| Error::from_milli(e.into(), None))?
|
||||
{
|
||||
dump_content_file
|
||||
.push_document(&obkv_to_object(doc, &documents_batch_index)?)?;
|
||||
dump_content_file.push_document(
|
||||
&obkv_to_object(doc, &documents_batch_index)
|
||||
.map_err(|e| Error::from_milli(e, None))?,
|
||||
)?;
|
||||
}
|
||||
dump_content_file.flush()?;
|
||||
}
|
||||
|
@ -814,27 +818,41 @@ impl IndexScheduler {
|
|||
let metadata = IndexMetadata {
|
||||
uid: uid.to_owned(),
|
||||
primary_key: index.primary_key(&rtxn)?.map(String::from),
|
||||
created_at: index.created_at(&rtxn)?,
|
||||
updated_at: index.updated_at(&rtxn)?,
|
||||
created_at: index
|
||||
.created_at(&rtxn)
|
||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?,
|
||||
updated_at: index
|
||||
.updated_at(&rtxn)
|
||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?,
|
||||
};
|
||||
let mut index_dumper = dump.create_index(uid, &metadata)?;
|
||||
|
||||
let fields_ids_map = index.fields_ids_map(&rtxn)?;
|
||||
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
|
||||
let embedding_configs = index.embedding_configs(&rtxn)?;
|
||||
let embedding_configs = index
|
||||
.embedding_configs(&rtxn)
|
||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||
|
||||
let documents = index
|
||||
.all_documents(&rtxn)
|
||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||
// 3.1. Dump the documents
|
||||
for ret in index.all_documents(&rtxn)? {
|
||||
for ret in documents {
|
||||
if self.must_stop_processing.get() {
|
||||
return Err(Error::AbortedTask);
|
||||
}
|
||||
|
||||
let (id, doc) = ret?;
|
||||
let (id, doc) =
|
||||
ret.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||
|
||||
let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?;
|
||||
let mut document =
|
||||
milli::obkv_to_json(&all_fields, &fields_ids_map, doc)
|
||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||
|
||||
'inject_vectors: {
|
||||
let embeddings = index.embeddings(&rtxn, id)?;
|
||||
let embeddings = index
|
||||
.embeddings(&rtxn, id)
|
||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||
|
||||
if embeddings.is_empty() {
|
||||
break 'inject_vectors;
|
||||
|
@ -845,7 +863,7 @@ impl IndexScheduler {
|
|||
.or_insert(serde_json::Value::Object(Default::default()));
|
||||
|
||||
let serde_json::Value::Object(vectors) = vectors else {
|
||||
return Err(milli::Error::UserError(
|
||||
let user_err = milli::Error::UserError(
|
||||
milli::UserError::InvalidVectorsMapType {
|
||||
document_id: {
|
||||
if let Ok(Some(Ok(index))) = index
|
||||
|
@ -859,8 +877,9 @@ impl IndexScheduler {
|
|||
},
|
||||
value: vectors.clone(),
|
||||
},
|
||||
)
|
||||
.into());
|
||||
);
|
||||
|
||||
return Err(Error::from_milli(user_err, Some(uid.to_string())));
|
||||
};
|
||||
|
||||
for (embedder_name, embeddings) in embeddings {
|
||||
|
@ -890,7 +909,8 @@ impl IndexScheduler {
|
|||
index,
|
||||
&rtxn,
|
||||
meilisearch_types::settings::SecretPolicy::RevealSecrets,
|
||||
)?;
|
||||
)
|
||||
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
|
||||
index_dumper.settings(&settings)?;
|
||||
Ok(())
|
||||
})?;
|
||||
|
@ -946,7 +966,8 @@ impl IndexScheduler {
|
|||
// the entire batch.
|
||||
let res = || -> Result<()> {
|
||||
let index_rtxn = index.read_txn()?;
|
||||
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)?;
|
||||
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)
|
||||
.map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?;
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?;
|
||||
wtxn.commit()?;
|
||||
|
@ -988,10 +1009,12 @@ impl IndexScheduler {
|
|||
);
|
||||
builder.set_primary_key(primary_key);
|
||||
let must_stop_processing = self.must_stop_processing.clone();
|
||||
builder.execute(
|
||||
|indexing_step| tracing::debug!(update = ?indexing_step),
|
||||
|| must_stop_processing.get(),
|
||||
)?;
|
||||
builder
|
||||
.execute(
|
||||
|indexing_step| tracing::debug!(update = ?indexing_step),
|
||||
|| must_stop_processing.get(),
|
||||
)
|
||||
.map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?;
|
||||
index_wtxn.commit()?;
|
||||
}
|
||||
|
||||
|
@ -1008,7 +1031,8 @@ impl IndexScheduler {
|
|||
let res = || -> Result<()> {
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let index_rtxn = index.read_txn()?;
|
||||
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)?;
|
||||
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)
|
||||
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
|
||||
self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?;
|
||||
wtxn.commit()?;
|
||||
Ok(())
|
||||
|
@ -1031,7 +1055,9 @@ impl IndexScheduler {
|
|||
let number_of_documents = || -> Result<u64> {
|
||||
let index = self.index_mapper.index(&wtxn, &index_uid)?;
|
||||
let index_rtxn = index.read_txn()?;
|
||||
Ok(index.number_of_documents(&index_rtxn)?)
|
||||
index
|
||||
.number_of_documents(&index_rtxn)
|
||||
.map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))
|
||||
}()
|
||||
.unwrap_or_default();
|
||||
|
||||
|
@ -1188,8 +1214,10 @@ impl IndexScheduler {
|
|||
};
|
||||
|
||||
match operation {
|
||||
IndexOperation::DocumentClear { mut tasks, .. } => {
|
||||
let count = milli::update::ClearDocuments::new(index_wtxn, index).execute()?;
|
||||
IndexOperation::DocumentClear { index_uid, mut tasks } => {
|
||||
let count = milli::update::ClearDocuments::new(index_wtxn, index)
|
||||
.execute()
|
||||
.map_err(|e| Error::from_milli(e, Some(index_uid)))?;
|
||||
|
||||
let mut first_clear_found = false;
|
||||
for task in &mut tasks {
|
||||
|
@ -1209,7 +1237,7 @@ impl IndexScheduler {
|
|||
Ok(tasks)
|
||||
}
|
||||
IndexOperation::DocumentOperation {
|
||||
index_uid: _,
|
||||
index_uid,
|
||||
primary_key,
|
||||
method,
|
||||
operations,
|
||||
|
@ -1235,13 +1263,17 @@ impl IndexScheduler {
|
|||
|
||||
let mut content_files_iter = content_files.iter();
|
||||
let mut indexer = indexer::DocumentOperation::new(method);
|
||||
let embedders = index.embedding_configs(index_wtxn)?;
|
||||
let embedders = self.embedders(embedders)?;
|
||||
let embedders = index
|
||||
.embedding_configs(index_wtxn)
|
||||
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
|
||||
let embedders = self.embedders(index_uid.clone(), embedders)?;
|
||||
for operation in operations {
|
||||
match operation {
|
||||
DocumentOperation::Add(_content_uuid) => {
|
||||
let mmap = content_files_iter.next().unwrap();
|
||||
indexer.add_documents(mmap)?;
|
||||
indexer
|
||||
.add_documents(mmap)
|
||||
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
|
||||
}
|
||||
DocumentOperation::Delete(document_ids) => {
|
||||
let document_ids: bumpalo::collections::vec::Vec<_> = document_ids
|
||||
|
@ -1266,15 +1298,17 @@ impl IndexScheduler {
|
|||
}
|
||||
};
|
||||
|
||||
let (document_changes, operation_stats, primary_key) = indexer.into_changes(
|
||||
&indexer_alloc,
|
||||
index,
|
||||
&rtxn,
|
||||
primary_key.as_deref(),
|
||||
&mut new_fields_ids_map,
|
||||
&|| must_stop_processing.get(),
|
||||
&send_progress,
|
||||
)?;
|
||||
let (document_changes, operation_stats, primary_key) = indexer
|
||||
.into_changes(
|
||||
&indexer_alloc,
|
||||
index,
|
||||
&rtxn,
|
||||
primary_key.as_deref(),
|
||||
&mut new_fields_ids_map,
|
||||
&|| must_stop_processing.get(),
|
||||
&send_progress,
|
||||
)
|
||||
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
|
||||
|
||||
let mut addition = 0;
|
||||
for (stats, task) in operation_stats.into_iter().zip(&mut tasks) {
|
||||
|
@ -1321,14 +1355,15 @@ impl IndexScheduler {
|
|||
embedders,
|
||||
&|| must_stop_processing.get(),
|
||||
&send_progress,
|
||||
)?;
|
||||
)
|
||||
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
|
||||
|
||||
tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
|
||||
}
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
IndexOperation::DocumentEdition { mut task, .. } => {
|
||||
IndexOperation::DocumentEdition { index_uid, mut task } => {
|
||||
let (filter, code) = if let KindWithContent::DocumentEdition {
|
||||
filter_expr,
|
||||
context: _,
|
||||
|
@ -1342,16 +1377,11 @@ impl IndexScheduler {
|
|||
};
|
||||
|
||||
let candidates = match filter.as_ref().map(Filter::from_json) {
|
||||
Some(Ok(Some(filter))) => {
|
||||
filter.evaluate(index_wtxn, index).map_err(|err| match err {
|
||||
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
|
||||
Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter)
|
||||
}
|
||||
e => e.into(),
|
||||
})?
|
||||
}
|
||||
Some(Ok(Some(filter))) => filter
|
||||
.evaluate(index_wtxn, index)
|
||||
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?,
|
||||
None | Some(Ok(None)) => index.documents_ids(index_wtxn)?,
|
||||
Some(Err(e)) => return Err(e.into()),
|
||||
Some(Err(e)) => return Err(Error::from_milli(e, Some(index_uid.clone()))),
|
||||
};
|
||||
|
||||
let (original_filter, context, function) = if let Some(Details::DocumentEdition {
|
||||
|
@ -1386,8 +1416,9 @@ impl IndexScheduler {
|
|||
// candidates not empty => index not empty => a primary key is set
|
||||
let primary_key = index.primary_key(&rtxn)?.unwrap();
|
||||
|
||||
let primary_key = PrimaryKey::new_or_insert(primary_key, &mut new_fields_ids_map)
|
||||
.map_err(milli::Error::from)?;
|
||||
let primary_key =
|
||||
PrimaryKey::new_or_insert(primary_key, &mut new_fields_ids_map)
|
||||
.map_err(|err| Error::from_milli(err.into(), Some(index_uid.clone())))?;
|
||||
|
||||
let result_count = Ok((candidates.len(), candidates.len())) as Result<_>;
|
||||
|
||||
|
@ -1406,11 +1437,17 @@ impl IndexScheduler {
|
|||
};
|
||||
|
||||
let indexer = UpdateByFunction::new(candidates, context.clone(), code.clone());
|
||||
let document_changes =
|
||||
pool.install(|| indexer.into_changes(&primary_key)).unwrap()?;
|
||||
|
||||
let embedders = index.embedding_configs(index_wtxn)?;
|
||||
let embedders = self.embedders(embedders)?;
|
||||
let document_changes = pool
|
||||
.install(|| {
|
||||
indexer
|
||||
.into_changes(&primary_key)
|
||||
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))
|
||||
})
|
||||
.unwrap()?;
|
||||
let embedders = index
|
||||
.embedding_configs(index_wtxn)
|
||||
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
|
||||
let embedders = self.embedders(index_uid.clone(), embedders)?;
|
||||
|
||||
indexer::index(
|
||||
index_wtxn,
|
||||
|
@ -1424,7 +1461,8 @@ impl IndexScheduler {
|
|||
embedders,
|
||||
&|| must_stop_processing.get(),
|
||||
&send_progress,
|
||||
)?;
|
||||
)
|
||||
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
|
||||
|
||||
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
|
||||
}
|
||||
|
@ -1455,7 +1493,7 @@ impl IndexScheduler {
|
|||
|
||||
Ok(vec![task])
|
||||
}
|
||||
IndexOperation::DocumentDeletion { mut tasks, index_uid: _ } => {
|
||||
IndexOperation::DocumentDeletion { mut tasks, index_uid } => {
|
||||
let mut to_delete = RoaringBitmap::new();
|
||||
let external_documents_ids = index.external_documents_ids();
|
||||
|
||||
|
@ -1476,35 +1514,23 @@ impl IndexScheduler {
|
|||
deleted_documents: Some(will_be_removed),
|
||||
});
|
||||
}
|
||||
KindWithContent::DocumentDeletionByFilter { index_uid: _, filter_expr } => {
|
||||
KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr } => {
|
||||
let before = to_delete.len();
|
||||
let filter = match Filter::from_json(filter_expr) {
|
||||
Ok(filter) => filter,
|
||||
Err(err) => {
|
||||
// theorically, this should be catched by deserr before reaching the index-scheduler and cannot happens
|
||||
task.status = Status::Failed;
|
||||
task.error = match err {
|
||||
milli::Error::UserError(
|
||||
milli::UserError::InvalidFilterExpression { .. },
|
||||
) => Some(
|
||||
Error::from(err)
|
||||
.with_custom_error_code(Code::InvalidDocumentFilter)
|
||||
.into(),
|
||||
),
|
||||
e => Some(e.into()),
|
||||
};
|
||||
task.error = Some(
|
||||
Error::from_milli(err, Some(index_uid.clone())).into(),
|
||||
);
|
||||
None
|
||||
}
|
||||
};
|
||||
if let Some(filter) = filter {
|
||||
let candidates =
|
||||
filter.evaluate(index_wtxn, index).map_err(|err| match err {
|
||||
milli::Error::UserError(
|
||||
milli::UserError::InvalidFilter(_),
|
||||
) => Error::from(err)
|
||||
.with_custom_error_code(Code::InvalidDocumentFilter),
|
||||
e => e.into(),
|
||||
});
|
||||
let candidates = filter
|
||||
.evaluate(index_wtxn, index)
|
||||
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())));
|
||||
match candidates {
|
||||
Ok(candidates) => to_delete |= candidates,
|
||||
Err(err) => {
|
||||
|
@ -1540,8 +1566,9 @@ impl IndexScheduler {
|
|||
// to_delete not empty => index not empty => primary key set
|
||||
let primary_key = index.primary_key(&rtxn)?.unwrap();
|
||||
|
||||
let primary_key = PrimaryKey::new_or_insert(primary_key, &mut new_fields_ids_map)
|
||||
.map_err(milli::Error::from)?;
|
||||
let primary_key =
|
||||
PrimaryKey::new_or_insert(primary_key, &mut new_fields_ids_map)
|
||||
.map_err(|err| Error::from_milli(err.into(), Some(index_uid.clone())))?;
|
||||
|
||||
if !tasks.iter().all(|res| res.error.is_some()) {
|
||||
let local_pool;
|
||||
|
@ -1560,8 +1587,10 @@ impl IndexScheduler {
|
|||
let mut indexer = indexer::DocumentDeletion::new();
|
||||
indexer.delete_documents_by_docids(to_delete);
|
||||
let document_changes = indexer.into_changes(&indexer_alloc, primary_key);
|
||||
let embedders = index.embedding_configs(index_wtxn)?;
|
||||
let embedders = self.embedders(embedders)?;
|
||||
let embedders = index
|
||||
.embedding_configs(index_wtxn)
|
||||
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
|
||||
let embedders = self.embedders(index_uid.clone(), embedders)?;
|
||||
|
||||
indexer::index(
|
||||
index_wtxn,
|
||||
|
@ -1575,14 +1604,15 @@ impl IndexScheduler {
|
|||
embedders,
|
||||
&|| must_stop_processing.get(),
|
||||
&send_progress,
|
||||
)?;
|
||||
)
|
||||
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
|
||||
|
||||
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
|
||||
}
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
IndexOperation::Settings { index_uid: _, settings, mut tasks } => {
|
||||
IndexOperation::Settings { index_uid, settings, mut tasks } => {
|
||||
let indexer_config = self.index_mapper.indexer_config();
|
||||
let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config);
|
||||
|
||||
|
@ -1596,10 +1626,12 @@ impl IndexScheduler {
|
|||
task.status = Status::Succeeded;
|
||||
}
|
||||
|
||||
builder.execute(
|
||||
|indexing_step| tracing::debug!(update = ?indexing_step),
|
||||
|| must_stop_processing.get(),
|
||||
)?;
|
||||
builder
|
||||
.execute(
|
||||
|indexing_step| tracing::debug!(update = ?indexing_step),
|
||||
|| must_stop_processing.get(),
|
||||
)
|
||||
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
|
|
|
@ -1,13 +1,12 @@
|
|||
use std::fmt::Display;
|
||||
|
||||
use crate::TaskId;
|
||||
use meilisearch_types::batches::BatchId;
|
||||
use meilisearch_types::error::{Code, ErrorCode};
|
||||
use meilisearch_types::tasks::{Kind, Status};
|
||||
use meilisearch_types::{heed, milli};
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::TaskId;
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
pub enum DateField {
|
||||
BeforeEnqueuedAt,
|
||||
|
@ -122,11 +121,11 @@ pub enum Error {
|
|||
Dump(#[from] dump::Error),
|
||||
#[error(transparent)]
|
||||
Heed(#[from] heed::Error),
|
||||
#[error("{}", match .index_name {
|
||||
Some(name) if !name.is_empty() => format!("Index `{}`: {error}", name),
|
||||
#[error("{}", match .index_uid {
|
||||
Some(uid) if !uid.is_empty() => format!("Index `{}`: {error}", uid),
|
||||
_ => format!("{error}")
|
||||
})]
|
||||
Milli { error: milli::Error, index_name: Option<String> },
|
||||
Milli { error: milli::Error, index_uid: Option<String> },
|
||||
#[error("An unexpected crash occurred when processing the task.")]
|
||||
ProcessBatchPanicked,
|
||||
#[error(transparent)]
|
||||
|
@ -213,8 +212,18 @@ impl Error {
|
|||
Self::WithCustomErrorCode(code, Box::new(self))
|
||||
}
|
||||
|
||||
pub fn from_milli(error: milli::Error, index_name: Option<String>) -> Self {
|
||||
Self::Milli { error, index_name }
|
||||
pub fn from_milli(err: milli::Error, index_uid: Option<String>) -> Self {
|
||||
match err {
|
||||
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
|
||||
Self::Milli { error: err, index_uid }
|
||||
.with_custom_error_code(Code::InvalidDocumentFilter)
|
||||
}
|
||||
milli::Error::UserError(milli::UserError::InvalidFilterExpression { .. }) => {
|
||||
Self::Milli { error: err, index_uid }
|
||||
.with_custom_error_code(Code::InvalidDocumentFilter)
|
||||
}
|
||||
_ => Self::Milli { error: err, index_uid },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,9 +8,8 @@ use time::OffsetDateTime;
|
|||
use uuid::Uuid;
|
||||
|
||||
use super::IndexStatus::{self, Available, BeingDeleted, Closing, Missing};
|
||||
use crate::clamp_to_page_size;
|
||||
use crate::lru::{InsertionOutcome, LruMap};
|
||||
use crate::{clamp_to_page_size};
|
||||
|
||||
/// Keep an internally consistent view of the open indexes in memory.
|
||||
///
|
||||
/// This view is made of an LRU cache that will evict the least frequently used indexes when new indexes are opened.
|
||||
|
|
|
@ -3,19 +3,19 @@ use std::sync::{Arc, RwLock};
|
|||
use std::time::Duration;
|
||||
use std::{fs, thread};
|
||||
|
||||
use self::index_map::IndexMap;
|
||||
use self::IndexStatus::{Available, BeingDeleted, Closing, Missing};
|
||||
use crate::uuid_codec::UuidCodec;
|
||||
use crate::{Error, Result};
|
||||
use meilisearch_types::heed::types::{SerdeJson, Str};
|
||||
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
|
||||
use meilisearch_types::milli;
|
||||
use meilisearch_types::milli::update::IndexerConfig;
|
||||
use meilisearch_types::milli::{FieldDistribution, Index};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use time::OffsetDateTime;
|
||||
use tracing::error;
|
||||
use uuid::Uuid;
|
||||
use meilisearch_types::milli;
|
||||
use self::index_map::IndexMap;
|
||||
use self::IndexStatus::{Available, BeingDeleted, Closing, Missing};
|
||||
use crate::uuid_codec::UuidCodec;
|
||||
use crate::{Error, Result};
|
||||
|
||||
mod index_map;
|
||||
|
||||
|
@ -183,13 +183,18 @@ impl IndexMapper {
|
|||
// Error if the UUIDv4 somehow already exists in the map, since it should be fresh.
|
||||
// This is very unlikely to happen in practice.
|
||||
// TODO: it would be better to lazily create the index. But we need an Index::open function for milli.
|
||||
let index = self.index_map.write().unwrap().create(
|
||||
&uuid,
|
||||
&index_path,
|
||||
date,
|
||||
self.enable_mdb_writemap,
|
||||
self.index_base_map_size,
|
||||
).map_err(|e| Error::from_milli(e, Some(uuid.to_string())))?;
|
||||
let index = self
|
||||
.index_map
|
||||
.write()
|
||||
.unwrap()
|
||||
.create(
|
||||
&uuid,
|
||||
&index_path,
|
||||
date,
|
||||
self.enable_mdb_writemap,
|
||||
self.index_base_map_size,
|
||||
)
|
||||
.map_err(|e| Error::from_milli(e, Some(uuid.to_string())))?;
|
||||
|
||||
wtxn.commit()?;
|
||||
|
||||
|
@ -357,7 +362,8 @@ impl IndexMapper {
|
|||
};
|
||||
let index_path = self.base_path.join(uuid.to_string());
|
||||
// take the lock to reopen the environment.
|
||||
reopen.reopen(&mut self.index_map.write().unwrap(), &index_path)
|
||||
reopen
|
||||
.reopen(&mut self.index_map.write().unwrap(), &index_path)
|
||||
.map_err(|e| Error::from_milli(e, Some(uuid.to_string())))?;
|
||||
continue;
|
||||
}
|
||||
|
@ -373,13 +379,15 @@ impl IndexMapper {
|
|||
Missing => {
|
||||
let index_path = self.base_path.join(uuid.to_string());
|
||||
|
||||
break index_map.create(
|
||||
&uuid,
|
||||
&index_path,
|
||||
None,
|
||||
self.enable_mdb_writemap,
|
||||
self.index_base_map_size,
|
||||
).map_err(|e| Error::from_milli(e, Some(uuid.to_string())))?;
|
||||
break index_map
|
||||
.create(
|
||||
&uuid,
|
||||
&index_path,
|
||||
None,
|
||||
self.enable_mdb_writemap,
|
||||
self.index_base_map_size,
|
||||
)
|
||||
.map_err(|e| Error::from_milli(e, Some(uuid.to_string())))?;
|
||||
}
|
||||
Available(index) => break index,
|
||||
Closing(_) => {
|
||||
|
@ -460,7 +468,8 @@ impl IndexMapper {
|
|||
None => {
|
||||
let index = self.index(rtxn, index_uid)?;
|
||||
let index_rtxn = index.read_txn()?;
|
||||
IndexStats::new(&index, &index_rtxn).map_err(|e| Error::from_milli(e, Some(uuid.to_string())))
|
||||
IndexStats::new(&index, &index_rtxn)
|
||||
.map_err(|e| Error::from_milli(e, Some(uuid.to_string())))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1678,8 +1678,9 @@ impl IndexScheduler {
|
|||
tracing::info!("A batch of tasks was successfully completed with {success} successful tasks and {failure} failed tasks.");
|
||||
}
|
||||
// If we have an abortion error we must stop the tick here and re-schedule tasks.
|
||||
Err(Error::Milli{
|
||||
error: milli::Error::InternalError(milli::InternalError::AbortedIndexation), ..
|
||||
Err(Error::Milli {
|
||||
error: milli::Error::InternalError(milli::InternalError::AbortedIndexation),
|
||||
..
|
||||
})
|
||||
| Err(Error::AbortedTask) => {
|
||||
#[cfg(test)]
|
||||
|
@ -1700,7 +1701,8 @@ impl IndexScheduler {
|
|||
// 3. resize it
|
||||
// 4. re-schedule tasks
|
||||
Err(Error::Milli {
|
||||
error: milli::Error::UserError(milli::UserError::MaxDatabaseSizeReached), ..
|
||||
error: milli::Error::UserError(milli::UserError::MaxDatabaseSizeReached),
|
||||
..
|
||||
}) if index_uid.is_some() => {
|
||||
// fixme: add index_uid to match to avoid the unwrap
|
||||
let index_uid = index_uid.unwrap();
|
||||
|
@ -1954,11 +1956,12 @@ impl IndexScheduler {
|
|||
config: milli::vector::EmbeddingConfig { embedder_options, prompt, quantized },
|
||||
..
|
||||
}| {
|
||||
let prompt =
|
||||
Arc::new(prompt.try_into()
|
||||
let prompt = Arc::new(
|
||||
prompt
|
||||
.try_into()
|
||||
.map_err(meilisearch_types::milli::Error::from)
|
||||
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?
|
||||
);
|
||||
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?,
|
||||
);
|
||||
// optimistically return existing embedder
|
||||
{
|
||||
let embedders = self.embedders.read().unwrap();
|
||||
|
@ -1974,8 +1977,9 @@ impl IndexScheduler {
|
|||
let embedder = Arc::new(
|
||||
Embedder::new(embedder_options.clone())
|
||||
.map_err(meilisearch_types::milli::vector::Error::from)
|
||||
.map_err(meilisearch_types::milli::Error::from)
|
||||
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?,
|
||||
.map_err(|err| {
|
||||
Error::from_milli(err.into(), Some(index_uid.clone()))
|
||||
})?,
|
||||
);
|
||||
{
|
||||
let mut embedders = self.embedders.write().unwrap();
|
||||
|
@ -6176,7 +6180,7 @@ mod tests {
|
|||
insta::assert_json_snapshot!(simple_hf_config.embedder_options);
|
||||
let simple_hf_name = name.clone();
|
||||
|
||||
let configs = index_scheduler.embedders(configs).unwrap();
|
||||
let configs = index_scheduler.embedders("doggos".to_string(), configs).unwrap();
|
||||
let (hf_embedder, _, _) = configs.get(&simple_hf_name).unwrap();
|
||||
let beagle_embed =
|
||||
hf_embedder.embed_one(S("Intel the beagle best doggo"), None).unwrap();
|
||||
|
|
|
@ -9,8 +9,8 @@ source: crates/index-scheduler/src/lib.rs
|
|||
0 {uid: 0, batch_uid: 0, status: succeeded, details: { settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, facet_search: NotSet, prefix_search: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, facet_search: NotSet, prefix_search: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> }, is_deletion: false, allow_index_creation: true }}
|
||||
1 {uid: 1, batch_uid: 1, status: succeeded, details: { received_documents: 3, indexed_documents: Some(3) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }}
|
||||
2 {uid: 2, batch_uid: 2, status: succeeded, details: { received_document_ids: 1, deleted_documents: Some(1) }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1"] }}
|
||||
3 {uid: 3, batch_uid: 2, status: failed, error: ResponseError { code: 200, message: "Invalid type for filter subexpression: expected: String, Array, found: true.", error_code: "invalid_document_filter", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#invalid_document_filter" }, details: { original_filter: true, deleted_documents: Some(0) }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: Bool(true) }}
|
||||
4 {uid: 4, batch_uid: 2, status: failed, error: ResponseError { code: 200, message: "Attribute `id` is not filterable. Available filterable attributes are: `catto`.\n1:3 id = 2", error_code: "invalid_document_filter", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#invalid_document_filter" }, details: { original_filter: "id = 2", deleted_documents: Some(0) }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: String("id = 2") }}
|
||||
3 {uid: 3, batch_uid: 2, status: failed, error: ResponseError { code: 200, message: "Index `doggos`: Invalid type for filter subexpression: expected: String, Array, found: true.", error_code: "invalid_document_filter", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#invalid_document_filter" }, details: { original_filter: true, deleted_documents: Some(0) }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: Bool(true) }}
|
||||
4 {uid: 4, batch_uid: 2, status: failed, error: ResponseError { code: 200, message: "Index `doggos`: Attribute `id` is not filterable. Available filterable attributes are: `catto`.\n1:3 id = 2", error_code: "invalid_document_filter", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#invalid_document_filter" }, details: { original_filter: "id = 2", deleted_documents: Some(0) }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: String("id = 2") }}
|
||||
5 {uid: 5, batch_uid: 2, status: succeeded, details: { original_filter: "catto EXISTS", deleted_documents: Some(1) }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: String("catto EXISTS") }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue