Merge remote-tracking branch 'origin/main' into temp-wildcard

This commit is contained in:
Clément Renault 2023-02-09 13:14:05 +01:00
commit 4570d5bf3a
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
164 changed files with 8353 additions and 2473 deletions

View file

@ -453,6 +453,10 @@ impl IndexScheduler {
&self.index_mapper.indexer_config
}
pub fn size(&self) -> Result<u64> {
Ok(self.env.real_disk_size()?)
}
/// Return the index corresponding to the name.
///
/// * If the index wasn't opened before, the index will be opened.
@ -503,13 +507,22 @@ impl IndexScheduler {
}
if let Some(canceled_by) = &query.canceled_by {
let mut all_canceled_tasks = RoaringBitmap::new();
for cancel_task_uid in canceled_by {
if let Some(canceled_by_uid) =
self.canceled_by.get(rtxn, &BEU32::new(*cancel_task_uid))?
{
tasks &= canceled_by_uid;
all_canceled_tasks |= canceled_by_uid;
}
}
// if the canceled_by has been specified but no task
// matches then we prefer matching zero than all tasks.
if all_canceled_tasks.is_empty() {
return Ok(RoaringBitmap::new());
} else {
tasks &= all_canceled_tasks;
}
}
if let Some(kind) = &query.types {
@ -890,6 +903,11 @@ impl IndexScheduler {
Ok(self.file_store.new_update_with_uuid(uuid)?)
}
/// The size on disk taken by all the updates files contained in the `IndexScheduler`, in bytes.
pub fn compute_update_file_size(&self) -> Result<u64> {
Ok(self.file_store.compute_total_size()?)
}
/// Delete a file from the index scheduler.
///
/// Counterpart to the [`create_update_file`](IndexScheduler::create_update_file) method.
@ -1992,7 +2010,7 @@ mod tests {
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap());
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]
@ -2039,7 +2057,7 @@ mod tests {
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap());
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]
@ -2091,7 +2109,7 @@ mod tests {
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap());
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]
@ -2142,7 +2160,7 @@ mod tests {
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap());
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]
@ -2193,7 +2211,7 @@ mod tests {
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap());
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[macro_export]
@ -2847,7 +2865,7 @@ mod tests {
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap());
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]
@ -2910,7 +2928,7 @@ mod tests {
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap());
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]
@ -2970,7 +2988,7 @@ mod tests {
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap());
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]
@ -3027,7 +3045,361 @@ mod tests {
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap());
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]
fn test_document_addition_with_multiple_primary_key() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
for (id, primary_key) in ["id", "bork", "bloup"].iter().enumerate() {
let content = format!(
r#"{{
"id": {id},
"doggo": "jean bob"
}}"#,
);
let (uuid, mut file) =
index_scheduler.create_update_file_with_uuid(id as u128).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
assert_eq!(documents_count, 1);
file.persist().unwrap();
index_scheduler
.register(KindWithContent::DocumentAdditionOrUpdate {
index_uid: S("doggos"),
primary_key: Some(S(primary_key)),
method: ReplaceDocuments,
content_file: uuid,
documents_count,
allow_index_creation: true,
})
.unwrap();
index_scheduler.assert_internally_consistent();
}
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_registering_the_3_tasks");
// A first batch should be processed with only the first documentAddition.
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "only_first_task_succeed");
// The second batch should fail.
handle.advance_one_failed_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_task_fails");
// The second batch should fail.
handle.advance_one_failed_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_task_fails");
// Is the primary key still what we expect?
let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap();
let primary_key = index.primary_key(&rtxn).unwrap().unwrap();
snapshot!(primary_key, @"id");
// Is the document still the one we expect?.
let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index
.all_documents(&rtxn)
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]
fn test_document_addition_with_multiple_primary_key_batch_wrong_key() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
for (id, primary_key) in ["id", "bork", "bork"].iter().enumerate() {
let content = format!(
r#"{{
"id": {id},
"doggo": "jean bob"
}}"#,
);
let (uuid, mut file) =
index_scheduler.create_update_file_with_uuid(id as u128).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
assert_eq!(documents_count, 1);
file.persist().unwrap();
index_scheduler
.register(KindWithContent::DocumentAdditionOrUpdate {
index_uid: S("doggos"),
primary_key: Some(S(primary_key)),
method: ReplaceDocuments,
content_file: uuid,
documents_count,
allow_index_creation: true,
})
.unwrap();
index_scheduler.assert_internally_consistent();
}
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_registering_the_3_tasks");
// A first batch should be processed with only the first documentAddition.
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "only_first_task_succeed");
// The second batch should fail and contains two tasks.
handle.advance_one_failed_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_and_third_tasks_fails");
// Is the primary key still what we expect?
let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap();
let primary_key = index.primary_key(&rtxn).unwrap().unwrap();
snapshot!(primary_key, @"id");
// Is the document still the one we expect?.
let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index
.all_documents(&rtxn)
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]
fn test_document_addition_with_bad_primary_key() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
for (id, primary_key) in ["bork", "bork", "id", "bork", "id"].iter().enumerate() {
let content = format!(
r#"{{
"id": {id},
"doggo": "jean bob"
}}"#,
);
let (uuid, mut file) =
index_scheduler.create_update_file_with_uuid(id as u128).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
assert_eq!(documents_count, 1);
file.persist().unwrap();
index_scheduler
.register(KindWithContent::DocumentAdditionOrUpdate {
index_uid: S("doggos"),
primary_key: Some(S(primary_key)),
method: ReplaceDocuments,
content_file: uuid,
documents_count,
allow_index_creation: true,
})
.unwrap();
index_scheduler.assert_internally_consistent();
}
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_registering_the_5_tasks");
// A first batch should be processed with only the first two documentAddition.
// it should fails because the documents don't contains any `bork` field.
// NOTE: it's marked as successful because the batch didn't fails, it's the individual tasks that failed.
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_and_second_task_fails");
// The primary key should be set to none since we failed the batch.
let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap();
let primary_key = index.primary_key(&rtxn).unwrap();
snapshot!(primary_key.is_none(), @"true");
// The second batch should succeed and only contains one task.
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_task_succeeds");
// The primary key should be set to `id` since this batch succeeded.
let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap();
let primary_key = index.primary_key(&rtxn).unwrap().unwrap();
snapshot!(primary_key, @"id");
// We're trying to `bork` again, but now there is already a primary key set for this index.
handle.advance_one_failed_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "fourth_task_fails");
// Finally the last task should succeed since its primary key is the same as the valid one.
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "fifth_task_succeeds");
// Is the primary key still what we expect?
let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap();
let primary_key = index.primary_key(&rtxn).unwrap().unwrap();
snapshot!(primary_key, @"id");
// Is the document still the one we expect?.
let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index
.all_documents(&rtxn)
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]
fn test_document_addition_with_set_and_null_primary_key() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
for (id, primary_key) in
[None, Some("bork"), Some("paw"), None, None, Some("paw")].into_iter().enumerate()
{
let content = format!(
r#"{{
"paw": {id},
"doggo": "jean bob"
}}"#,
);
let (uuid, mut file) =
index_scheduler.create_update_file_with_uuid(id as u128).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
assert_eq!(documents_count, 1);
file.persist().unwrap();
index_scheduler
.register(KindWithContent::DocumentAdditionOrUpdate {
index_uid: S("doggos"),
primary_key: primary_key.map(|pk| pk.to_string()),
method: ReplaceDocuments,
content_file: uuid,
documents_count,
allow_index_creation: true,
})
.unwrap();
index_scheduler.assert_internally_consistent();
}
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_registering_the_6_tasks");
// A first batch should contains only one task that fails because we can't infer the primary key.
// NOTE: it's marked as successful because the batch didn't fails, it's the individual tasks that failed.
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_task_fails");
// The second batch should contains only one task that fails because we bork is not a valid primary key.
// NOTE: it's marked as successful because the batch didn't fails, it's the individual tasks that failed.
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_task_fails");
// No primary key should be set at this point.
let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap();
let primary_key = index.primary_key(&rtxn).unwrap();
snapshot!(primary_key.is_none(), @"true");
// The third batch should succeed and only contains one task.
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_task_succeeds");
// The primary key should be set to `id` since this batch succeeded.
let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap();
let primary_key = index.primary_key(&rtxn).unwrap().unwrap();
snapshot!(primary_key, @"paw");
// We should be able to batch together the next two tasks that don't specify any primary key
// + the last task that matches the current primary-key. Everything should succeed.
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "all_other_tasks_succeeds");
// Is the primary key still what we expect?
let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap();
let primary_key = index.primary_key(&rtxn).unwrap().unwrap();
snapshot!(primary_key, @"paw");
// Is the document still the one we expect?.
let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index
.all_documents(&rtxn)
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]
fn test_document_addition_with_set_and_null_primary_key_inference_works() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
for (id, primary_key) in [None, Some("bork"), Some("doggoid"), None, None, Some("doggoid")]
.into_iter()
.enumerate()
{
let content = format!(
r#"{{
"doggoid": {id},
"doggo": "jean bob"
}}"#,
);
let (uuid, mut file) =
index_scheduler.create_update_file_with_uuid(id as u128).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
assert_eq!(documents_count, 1);
file.persist().unwrap();
index_scheduler
.register(KindWithContent::DocumentAdditionOrUpdate {
index_uid: S("doggos"),
primary_key: primary_key.map(|pk| pk.to_string()),
method: ReplaceDocuments,
content_file: uuid,
documents_count,
allow_index_creation: true,
})
.unwrap();
index_scheduler.assert_internally_consistent();
}
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_registering_the_6_tasks");
// A first batch should contains only one task that succeed and sets the primary key to `doggoid`.
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_task_succeed");
// Checking the primary key.
let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap();
let primary_key = index.primary_key(&rtxn).unwrap();
snapshot!(primary_key.is_none(), @"false");
// The second batch should contains only one task that fails because it tries to update the primary key to `bork`.
handle.advance_one_failed_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_task_fails");
// The third batch should succeed and only contains one task.
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_task_succeeds");
// We should be able to batch together the next two tasks that don't specify any primary key
// + the last task that matches the current primary-key. Everything should succeed.
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "all_other_tasks_succeeds");
// Is the primary key still what we expect?
let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap();
let primary_key = index.primary_key(&rtxn).unwrap().unwrap();
snapshot!(primary_key, @"doggoid");
// Is the document still the one we expect?.
let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index
.all_documents(&rtxn)
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]