5192: Fix empty document addition r=irevoire a=irevoire

# Pull Request

## Related issue
Fixes #5190

## What does this PR do?
- Improve a test just to make sure this issue never arises again
- Fix the issue

For the reviewer: Calling `add_documents` with an empty `mmap` seems to work, but does it impact the perf in a significant way? / 

Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2024-12-31 17:11:10 +00:00 committed by GitHub
commit 5908aec6cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 105 additions and 6 deletions

View File

@ -1312,9 +1312,7 @@ impl IndexScheduler {
if let DocumentOperation::Add(content_uuid) = operation {
let content_file = self.file_store.get_update(*content_uuid)?;
let mmap = unsafe { memmap2::Mmap::map(&content_file)? };
if !mmap.is_empty() {
content_files.push(mmap);
}
content_files.push(mmap);
}
}

View File

@ -291,7 +291,10 @@ impl IndexScheduler {
debug_assert!(old_task != *task);
debug_assert_eq!(old_task.uid, task.uid);
debug_assert!(old_task.batch_uid.is_none() && task.batch_uid.is_some());
debug_assert!(
old_task.batch_uid.is_none() && task.batch_uid.is_some(),
"\n==> old: {old_task:?}\n==> new: {task:?}"
);
if old_task.status != task.status {
self.update_status(wtxn, old_task.status, |bitmap| {

View File

@ -1220,9 +1220,89 @@ async fn replace_document() {
#[actix_rt::test]
async fn add_no_documents() {
let server = Server::new().await;
let index = server.index("test");
let (_response, code) = index.add_documents(json!([]), None).await;
let index = server.index("kefir");
let (task, code) = index.add_documents(json!([]), None).await;
snapshot!(code, @"202 Accepted");
let task = server.wait_task(task.uid()).await;
let task = task.succeeded();
snapshot!(task, @r#"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": "kefir",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 0,
"indexedDocuments": 0
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
let (task, _code) = index.add_documents(json!([]), Some("kefkef")).await;
let task = server.wait_task(task.uid()).await;
let task = task.succeeded();
snapshot!(task, @r#"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": "kefir",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 0,
"indexedDocuments": 0
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
let (task, _code) = index.add_documents(json!([{ "kefkef": 1 }]), None).await;
let task = server.wait_task(task.uid()).await;
let task = task.succeeded();
snapshot!(task, @r#"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": "kefir",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 1
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
let (documents, _status) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
snapshot!(documents, @r#"
{
"results": [
{
"kefkef": 1
}
],
"offset": 0,
"limit": 20,
"total": 1
}
"#);
}
#[actix_rt::test]

View File

@ -252,6 +252,24 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
previous_offset = iter.byte_offset();
}
if payload.is_empty() {
let result = retrieve_or_guess_primary_key(
rtxn,
index,
new_fields_ids_map,
primary_key_from_op,
None,
);
match result {
Ok(Ok((pk, _))) => {
primary_key.get_or_insert(pk);
}
Ok(Err(UserError::NoPrimaryKeyCandidateFound)) => (),
Ok(Err(user_error)) => return Err(Error::UserError(user_error)),
Err(error) => return Err(error),
};
}
Ok(new_docids_version_offsets)
}