diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 47e5c93d2..3d7a65c8c 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1716,12 +1716,15 @@ mod tests { allow_index_creation: true, }) .unwrap(); + index_scheduler.assert_internally_consistent(); } snapshot!(snapshot_index_scheduler(&index_scheduler)); + index_scheduler.assert_internally_consistent(); // everything should be batched together. handle.advance_n_batch(1); + index_scheduler.assert_internally_consistent(); snapshot!(snapshot_index_scheduler(&index_scheduler)); @@ -1768,12 +1771,15 @@ mod tests { allow_index_creation: true, }) .unwrap(); + index_scheduler.assert_internally_consistent(); } snapshot!(snapshot_index_scheduler(&index_scheduler)); + index_scheduler.assert_internally_consistent(); // everything should be batched together. handle.advance_n_batch(1); + index_scheduler.assert_internally_consistent(); snapshot!(snapshot_index_scheduler(&index_scheduler)); @@ -1822,16 +1828,19 @@ mod tests { allow_index_creation: true, }) .unwrap(); + index_scheduler.assert_internally_consistent(); } snapshot!(snapshot_index_scheduler(&index_scheduler)); // Only half of the task should've been processed since we can't autobatch replace and update together. handle.advance_n_batch(5); + index_scheduler.assert_internally_consistent(); snapshot!(snapshot_index_scheduler(&index_scheduler)); handle.advance_n_batch(5); + index_scheduler.assert_internally_consistent(); // has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); @@ -1876,17 +1885,20 @@ mod tests { allow_index_creation: true, }) .unwrap(); + index_scheduler.assert_internally_consistent(); } snapshot!(snapshot_index_scheduler(&index_scheduler)); // Nothing should be batched thus half of the tasks are processed. handle.advance_n_batch(5); + index_scheduler.assert_internally_consistent(); snapshot!(snapshot_index_scheduler(&index_scheduler)); // Everything is processed. handle.advance_n_batch(5); + index_scheduler.assert_internally_consistent(); snapshot!(snapshot_index_scheduler(&index_scheduler)); @@ -1933,17 +1945,20 @@ mod tests { allow_index_creation: true, }) .unwrap(); + index_scheduler.assert_internally_consistent(); } snapshot!(snapshot_index_scheduler(&index_scheduler)); // Nothing should be batched thus half of the tasks are processed. handle.advance_n_batch(5); + index_scheduler.assert_internally_consistent(); snapshot!(snapshot_index_scheduler(&index_scheduler)); // Everything is processed. handle.advance_n_batch(5); + index_scheduler.assert_internally_consistent(); snapshot!(snapshot_index_scheduler(&index_scheduler)); @@ -1960,360 +1975,6 @@ mod tests { snapshot!(serde_json::to_string_pretty(&documents).unwrap()); } - #[test] - fn test_document_addition_cant_create_index_without_index() { - // We're going to autobatch multiple document addition that don't have - // the right to create an index while there is no index currently. - // Thus, everything should be batched together and a IndexDoesNotExists - // error should be throwed. - let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); - - for i in 0..10 { - let content = format!( - r#"{{ - "id": {}, - "doggo": "bob {}" - }}"#, - i, i - ); - - let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; - file.persist().unwrap(); - index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: false, - }) - .unwrap(); - } - - snapshot!(snapshot_index_scheduler(&index_scheduler)); - - // Everything should be batched together. - handle.advance_n_batch(1); - - snapshot!(snapshot_index_scheduler(&index_scheduler)); - - // The index should not exists. - snapshot!(format!("{}", index_scheduler.index("doggos").map(|_| ()).unwrap_err()), @"Index `doggos` not found."); - } - - #[test] - fn test_document_addition_cant_create_index_without_index_without_autobatching() { - // We're going to execute multiple document addition that don't have - // the right to create an index while there is no index currently. - // Since the autobatching is disabled, every tasks should be processed - // sequentially and throw an IndexDoesNotExists. - let (index_scheduler, handle) = IndexScheduler::test(false, vec![]); - - for i in 0..10 { - let content = format!( - r#"{{ - "id": {}, - "doggo": "bob {}" - }}"#, - i, i - ); - - let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; - file.persist().unwrap(); - index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: false, - }) - .unwrap(); - } - - snapshot!(snapshot_index_scheduler(&index_scheduler)); - - // Nothing should be batched thus half of the tasks are processed. - handle.advance_n_batch(5); - - snapshot!(snapshot_index_scheduler(&index_scheduler)); - - // Everything is processed. - handle.advance_n_batch(5); - - snapshot!(snapshot_index_scheduler(&index_scheduler)); - - // The index should not exists. - snapshot!(format!("{}", index_scheduler.index("doggos").map(|_| ()).unwrap_err()), @"Index `doggos` not found."); - } - - #[test] - fn test_document_addition_cant_create_index_with_index() { - // We're going to autobatch multiple document addition that don't have - // the right to create an index while there is already an index. - // Thus, everything should be batched together and no error should be - // throwed. - let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); - - // Create the index. - index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }) - .unwrap(); - handle.advance_n_batch(1); - - for i in 0..10 { - let content = format!( - r#"{{ - "id": {}, - "doggo": "bob {}" - }}"#, - i, i - ); - - let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; - file.persist().unwrap(); - index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: false, - }) - .unwrap(); - } - - snapshot!(snapshot_index_scheduler(&index_scheduler)); - - // Everything should be batched together. - handle.advance_n_batch(1); - - snapshot!(snapshot_index_scheduler(&index_scheduler)); - - // Has everything being pushed successfully in milli? - let index = index_scheduler.index("doggos").unwrap(); - let rtxn = index.read_txn().unwrap(); - let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let field_ids = field_ids_map.ids().collect::>(); - let documents = index - .all_documents(&rtxn) - .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) - .collect::>(); - snapshot!(serde_json::to_string_pretty(&documents).unwrap()); - } - - #[test] - fn test_document_addition_cant_create_index_with_index_without_autobatching() { - // We're going to execute multiple document addition that don't have - // the right to create an index while there is no index currently. - // Since the autobatching is disabled, every tasks should be processed - // sequentially and throw an IndexDoesNotExists. - let (index_scheduler, handle) = IndexScheduler::test(false, vec![]); - - // Create the index. - index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }) - .unwrap(); - handle.advance_n_batch(1); - - for i in 0..10 { - let content = format!( - r#"{{ - "id": {}, - "doggo": "bob {}" - }}"#, - i, i - ); - - let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; - file.persist().unwrap(); - index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation: false, - }) - .unwrap(); - } - - snapshot!(snapshot_index_scheduler(&index_scheduler)); - - // Nothing should be batched thus half of the tasks are processed. - handle.advance_n_batch(5); - - snapshot!(snapshot_index_scheduler(&index_scheduler)); - - // Everything is processed. - handle.advance_n_batch(5); - - snapshot!(snapshot_index_scheduler(&index_scheduler)); - - // Has everything being pushed successfully in milli? - let index = index_scheduler.index("doggos").unwrap(); - let rtxn = index.read_txn().unwrap(); - let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let field_ids = field_ids_map.ids().collect::>(); - let documents = index - .all_documents(&rtxn) - .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) - .collect::>(); - snapshot!(serde_json::to_string_pretty(&documents).unwrap()); - } - - #[test] - fn test_document_addition_mixed_rights_with_index() { - // We're going to autobatch multiple document addition. - // - The index already exists - // - The first document addition don't have the right to create an index - // can it batch with the other one? - let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); - - // Create the index. - index_scheduler - .register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }) - .unwrap(); - handle.advance_n_batch(1); - - for i in 0..10 { - let content = format!( - r#"{{ - "id": {}, - "doggo": "bob {}" - }}"#, - i, i - ); - let allow_index_creation = if i % 2 == 0 { false } else { true }; - - let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; - file.persist().unwrap(); - index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation, - }) - .unwrap(); - } - - snapshot!(snapshot_index_scheduler(&index_scheduler)); - - // Everything should be batched together. - handle.advance_n_batch(1); - - snapshot!(snapshot_index_scheduler(&index_scheduler)); - - // Has everything being pushed successfully in milli? - let index = index_scheduler.index("doggos").unwrap(); - let rtxn = index.read_txn().unwrap(); - let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let field_ids = field_ids_map.ids().collect::>(); - let documents = index - .all_documents(&rtxn) - .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) - .collect::>(); - snapshot!(serde_json::to_string_pretty(&documents).unwrap()); - } - - #[test] - fn test_document_addition_mixed_right_without_index_starts_with_cant_create() { - // We're going to autobatch multiple document addition. - // - The index does not exists - // - The first document addition don't have the right to create an index - // - The second do. They should not batch together. - // - The second should batch with everything else as it's going to create an index. - let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); - - for i in 0..10 { - let content = format!( - r#"{{ - "id": {}, - "doggo": "bob {}" - }}"#, - i, i - ); - let allow_index_creation = if i % 2 == 0 { false } else { true }; - - let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = meilisearch_types::document_formats::read_json( - content.as_bytes(), - file.as_file_mut(), - ) - .unwrap() as u64; - file.persist().unwrap(); - index_scheduler - .register(KindWithContent::DocumentAdditionOrUpdate { - index_uid: S("doggos"), - primary_key: Some(S("id")), - method: ReplaceDocuments, - content_file: uuid, - documents_count, - allow_index_creation, - }) - .unwrap(); - } - - snapshot!(snapshot_index_scheduler(&index_scheduler)); - - // A first batch should be processed with only the first documentAddition that's going to fail. - handle.advance_n_batch(1); - - snapshot!(snapshot_index_scheduler(&index_scheduler)); - - // Everything else should be batched together. - handle.advance_n_batch(1); - - snapshot!(snapshot_index_scheduler(&index_scheduler)); - - // Has everything being pushed successfully in milli? - let index = index_scheduler.index("doggos").unwrap(); - let rtxn = index.read_txn().unwrap(); - let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let field_ids = field_ids_map.ids().collect::>(); - let documents = index - .all_documents(&rtxn) - .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) - .collect::>(); - snapshot!(serde_json::to_string_pretty(&documents).unwrap()); - } - #[macro_export] macro_rules! debug_snapshot { ($value:expr, @$snapshot:literal) => {{ @@ -2622,6 +2283,379 @@ mod tests { assert!(test_duration.as_millis() > 1000); } + #[test] + fn test_document_addition_cant_create_index_without_index() { + // We're going to autobatch multiple document addition that don't have + // the right to create an index while there is no index currently. + // Thus, everything should be batched together and a IndexDoesNotExists + // error should be throwed. + let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); + + for i in 0..10 { + let content = format!( + r#"{{ + "id": {}, + "doggo": "bob {}" + }}"#, + i, i + ); + + let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); + let documents_count = meilisearch_types::document_formats::read_json( + content.as_bytes(), + file.as_file_mut(), + ) + .unwrap() as u64; + file.persist().unwrap(); + index_scheduler + .register(KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: false, + }) + .unwrap(); + index_scheduler.assert_internally_consistent(); + } + + snapshot!(snapshot_index_scheduler(&index_scheduler)); + + // Everything should be batched together. + handle.advance_n_batch(1); + index_scheduler.assert_internally_consistent(); + + snapshot!(snapshot_index_scheduler(&index_scheduler)); + + // The index should not exists. + snapshot!(format!("{}", index_scheduler.index("doggos").map(|_| ()).unwrap_err()), @"Index `doggos` not found."); + } + + #[test] + fn test_document_addition_cant_create_index_without_index_without_autobatching() { + // We're going to execute multiple document addition that don't have + // the right to create an index while there is no index currently. + // Since the autobatching is disabled, every tasks should be processed + // sequentially and throw an IndexDoesNotExists. + let (index_scheduler, handle) = IndexScheduler::test(false, vec![]); + + for i in 0..10 { + let content = format!( + r#"{{ + "id": {}, + "doggo": "bob {}" + }}"#, + i, i + ); + + let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); + let documents_count = meilisearch_types::document_formats::read_json( + content.as_bytes(), + file.as_file_mut(), + ) + .unwrap() as u64; + file.persist().unwrap(); + index_scheduler + .register(KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: false, + }) + .unwrap(); + index_scheduler.assert_internally_consistent(); + } + + snapshot!(snapshot_index_scheduler(&index_scheduler)); + + // Nothing should be batched thus half of the tasks are processed. + handle.advance_n_batch(5); + index_scheduler.assert_internally_consistent(); + + snapshot!(snapshot_index_scheduler(&index_scheduler)); + + // Everything is processed. + handle.advance_n_batch(5); + index_scheduler.assert_internally_consistent(); + + snapshot!(snapshot_index_scheduler(&index_scheduler)); + + // The index should not exists. + snapshot!(format!("{}", index_scheduler.index("doggos").map(|_| ()).unwrap_err()), @"Index `doggos` not found."); + } + + #[test] + fn test_document_addition_cant_create_index_with_index() { + // We're going to autobatch multiple document addition that don't have + // the right to create an index while there is already an index. + // Thus, everything should be batched together and no error should be + // throwed. + let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); + + // Create the index. + index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }) + .unwrap(); + index_scheduler.assert_internally_consistent(); + handle.advance_n_batch(1); + index_scheduler.assert_internally_consistent(); + + for i in 0..10 { + let content = format!( + r#"{{ + "id": {}, + "doggo": "bob {}" + }}"#, + i, i + ); + + let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); + let documents_count = meilisearch_types::document_formats::read_json( + content.as_bytes(), + file.as_file_mut(), + ) + .unwrap() as u64; + file.persist().unwrap(); + index_scheduler + .register(KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: false, + }) + .unwrap(); + index_scheduler.assert_internally_consistent(); + } + + snapshot!(snapshot_index_scheduler(&index_scheduler)); + + // Everything should be batched together. + handle.advance_n_batch(1); + index_scheduler.assert_internally_consistent(); + + snapshot!(snapshot_index_scheduler(&index_scheduler)); + + // Has everything being pushed successfully in milli? + let index = index_scheduler.index("doggos").unwrap(); + let rtxn = index.read_txn().unwrap(); + let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let field_ids = field_ids_map.ids().collect::>(); + let documents = index + .all_documents(&rtxn) + .unwrap() + .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .collect::>(); + snapshot!(serde_json::to_string_pretty(&documents).unwrap()); + } + + #[test] + fn test_document_addition_cant_create_index_with_index_without_autobatching() { + // We're going to execute multiple document addition that don't have + // the right to create an index while there is no index currently. + // Since the autobatching is disabled, every tasks should be processed + // sequentially and throw an IndexDoesNotExists. + let (index_scheduler, handle) = IndexScheduler::test(false, vec![]); + + // Create the index. + index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }) + .unwrap(); + handle.advance_n_batch(1); + index_scheduler.assert_internally_consistent(); + + for i in 0..10 { + let content = format!( + r#"{{ + "id": {}, + "doggo": "bob {}" + }}"#, + i, i + ); + + let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); + let documents_count = meilisearch_types::document_formats::read_json( + content.as_bytes(), + file.as_file_mut(), + ) + .unwrap() as u64; + file.persist().unwrap(); + index_scheduler + .register(KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: false, + }) + .unwrap(); + index_scheduler.assert_internally_consistent(); + } + + snapshot!(snapshot_index_scheduler(&index_scheduler)); + + // Nothing should be batched thus half of the tasks are processed. + handle.advance_n_batch(5); + index_scheduler.assert_internally_consistent(); + + snapshot!(snapshot_index_scheduler(&index_scheduler)); + + // Everything is processed. + handle.advance_n_batch(5); + index_scheduler.assert_internally_consistent(); + + snapshot!(snapshot_index_scheduler(&index_scheduler)); + + // Has everything being pushed successfully in milli? + let index = index_scheduler.index("doggos").unwrap(); + let rtxn = index.read_txn().unwrap(); + let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let field_ids = field_ids_map.ids().collect::>(); + let documents = index + .all_documents(&rtxn) + .unwrap() + .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .collect::>(); + snapshot!(serde_json::to_string_pretty(&documents).unwrap()); + } + + #[test] + fn test_document_addition_mixed_rights_with_index() { + // We're going to autobatch multiple document addition. + // - The index already exists + // - The first document addition don't have the right to create an index + // can it batch with the other one? + let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); + + // Create the index. + index_scheduler + .register(KindWithContent::IndexCreation { index_uid: S("doggos"), primary_key: None }) + .unwrap(); + handle.advance_n_batch(1); + index_scheduler.assert_internally_consistent(); + + for i in 0..10 { + let content = format!( + r#"{{ + "id": {}, + "doggo": "bob {}" + }}"#, + i, i + ); + let allow_index_creation = if i % 2 == 0 { false } else { true }; + + let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); + let documents_count = meilisearch_types::document_formats::read_json( + content.as_bytes(), + file.as_file_mut(), + ) + .unwrap() as u64; + file.persist().unwrap(); + index_scheduler + .register(KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation, + }) + .unwrap(); + index_scheduler.assert_internally_consistent(); + } + + snapshot!(snapshot_index_scheduler(&index_scheduler)); + + // Everything should be batched together. + handle.advance_n_batch(1); + index_scheduler.assert_internally_consistent(); + + snapshot!(snapshot_index_scheduler(&index_scheduler)); + + // Has everything being pushed successfully in milli? + let index = index_scheduler.index("doggos").unwrap(); + let rtxn = index.read_txn().unwrap(); + let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let field_ids = field_ids_map.ids().collect::>(); + let documents = index + .all_documents(&rtxn) + .unwrap() + .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .collect::>(); + snapshot!(serde_json::to_string_pretty(&documents).unwrap()); + } + + #[test] + fn test_document_addition_mixed_right_without_index_starts_with_cant_create() { + // We're going to autobatch multiple document addition. + // - The index does not exists + // - The first document addition don't have the right to create an index + // - The second do. They should not batch together. + // - The second should batch with everything else as it's going to create an index. + let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); + + for i in 0..10 { + let content = format!( + r#"{{ + "id": {}, + "doggo": "bob {}" + }}"#, + i, i + ); + let allow_index_creation = if i % 2 == 0 { false } else { true }; + + let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); + let documents_count = meilisearch_types::document_formats::read_json( + content.as_bytes(), + file.as_file_mut(), + ) + .unwrap() as u64; + file.persist().unwrap(); + index_scheduler + .register(KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation, + }) + .unwrap(); + index_scheduler.assert_internally_consistent(); + } + + snapshot!(snapshot_index_scheduler(&index_scheduler)); + + // A first batch should be processed with only the first documentAddition that's going to fail. + handle.advance_n_batch(1); + index_scheduler.assert_internally_consistent(); + + snapshot!(snapshot_index_scheduler(&index_scheduler)); + + // Everything else should be batched together. + handle.advance_n_batch(1); + index_scheduler.assert_internally_consistent(); + + snapshot!(snapshot_index_scheduler(&index_scheduler)); + + // Has everything being pushed successfully in milli? + let index = index_scheduler.index("doggos").unwrap(); + let rtxn = index.read_txn().unwrap(); + let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let field_ids = field_ids_map.ids().collect::>(); + let documents = index + .all_documents(&rtxn) + .unwrap() + .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .collect::>(); + snapshot!(serde_json::to_string_pretty(&documents).unwrap()); + } + #[test] fn panic_in_process_batch_for_index_creation() { let (index_scheduler, handle) = diff --git a/index-scheduler/src/snapshots/lib.rs/cancel_mix_of_tasks/cancel_processed.snap b/index-scheduler/src/snapshots/lib.rs/cancel_mix_of_tasks/cancel_processed.snap index 646f93cd6..e398ab205 100644 --- a/index-scheduler/src/snapshots/lib.rs/cancel_mix_of_tasks/cancel_processed.snap +++ b/index-scheduler/src/snapshots/lib.rs/cancel_mix_of_tasks/cancel_processed.snap @@ -40,7 +40,7 @@ wolfo [2,] ---------------------------------------------------------------------- ### Finished At: [timestamp] [0,] -[timestamp] [2,] +[timestamp] [1,2,] [timestamp] [3,] ---------------------------------------------------------------------- ### File Store: diff --git a/index-scheduler/src/snapshots/lib.rs/document_addition_and_index_deletion/2.snap b/index-scheduler/src/snapshots/lib.rs/document_addition_and_index_deletion/2.snap index 800b5a9b4..6954d37e0 100644 --- a/index-scheduler/src/snapshots/lib.rs/document_addition_and_index_deletion/2.snap +++ b/index-scheduler/src/snapshots/lib.rs/document_addition_and_index_deletion/2.snap @@ -32,11 +32,11 @@ doggos [0,1,2,] ---------------------------------------------------------------------- ### Started At: [timestamp] [0,] -[timestamp] [2,] +[timestamp] [1,2,] ---------------------------------------------------------------------- ### Finished At: [timestamp] [0,] -[timestamp] [2,] +[timestamp] [1,2,] ---------------------------------------------------------------------- ### File Store: diff --git a/index-scheduler/src/snapshots/lib.rs/document_addition_and_index_deletion_on_unexisting_index/2.snap b/index-scheduler/src/snapshots/lib.rs/document_addition_and_index_deletion_on_unexisting_index/2.snap index e29b7216e..2abd3e4cf 100644 --- a/index-scheduler/src/snapshots/lib.rs/document_addition_and_index_deletion_on_unexisting_index/2.snap +++ b/index-scheduler/src/snapshots/lib.rs/document_addition_and_index_deletion_on_unexisting_index/2.snap @@ -28,10 +28,10 @@ doggos [0,1,] [timestamp] [1,] ---------------------------------------------------------------------- ### Started At: -[timestamp] [1,] +[timestamp] [0,1,] ---------------------------------------------------------------------- ### Finished At: -[timestamp] [1,] +[timestamp] [0,1,] ---------------------------------------------------------------------- ### File Store: diff --git a/index-scheduler/src/snapshots/lib.rs/test_document_addition_cant_create_index_with_index/2.snap b/index-scheduler/src/snapshots/lib.rs/test_document_addition_cant_create_index_with_index/2.snap index 1343ec519..1fac082df 100644 --- a/index-scheduler/src/snapshots/lib.rs/test_document_addition_cant_create_index_with_index/2.snap +++ b/index-scheduler/src/snapshots/lib.rs/test_document_addition_cant_create_index_with_index/2.snap @@ -47,11 +47,11 @@ doggos [0,1,2,3,4,5,6,7,8,9,10,] ---------------------------------------------------------------------- ### Started At: [timestamp] [0,] -[timestamp] [10,] +[timestamp] [1,2,3,4,5,6,7,8,9,10,] ---------------------------------------------------------------------- ### Finished At: [timestamp] [0,] -[timestamp] [10,] +[timestamp] [1,2,3,4,5,6,7,8,9,10,] ---------------------------------------------------------------------- ### File Store: diff --git a/index-scheduler/src/snapshots/lib.rs/test_document_addition_cant_create_index_without_index/2.snap b/index-scheduler/src/snapshots/lib.rs/test_document_addition_cant_create_index_without_index/2.snap index aaeca2c00..983bde528 100644 --- a/index-scheduler/src/snapshots/lib.rs/test_document_addition_cant_create_index_without_index/2.snap +++ b/index-scheduler/src/snapshots/lib.rs/test_document_addition_cant_create_index_without_index/2.snap @@ -43,10 +43,10 @@ doggos [0,1,2,3,4,5,6,7,8,9,] [timestamp] [9,] ---------------------------------------------------------------------- ### Started At: -[timestamp] [9,] +[timestamp] [0,1,2,3,4,5,6,7,8,9,] ---------------------------------------------------------------------- ### Finished At: -[timestamp] [9,] +[timestamp] [0,1,2,3,4,5,6,7,8,9,] ---------------------------------------------------------------------- ### File Store: diff --git a/index-scheduler/src/snapshots/lib.rs/test_document_addition_mixed_right_without_index_starts_with_cant_create/3.snap b/index-scheduler/src/snapshots/lib.rs/test_document_addition_mixed_right_without_index_starts_with_cant_create/3.snap index cc11606fc..88a3866a7 100644 --- a/index-scheduler/src/snapshots/lib.rs/test_document_addition_mixed_right_without_index_starts_with_cant_create/3.snap +++ b/index-scheduler/src/snapshots/lib.rs/test_document_addition_mixed_right_without_index_starts_with_cant_create/3.snap @@ -45,11 +45,11 @@ doggos [0,1,2,3,4,5,6,7,8,9,] ---------------------------------------------------------------------- ### Started At: [timestamp] [0,] -[timestamp] [9,] +[timestamp] [1,2,3,4,5,6,7,8,9,] ---------------------------------------------------------------------- ### Finished At: [timestamp] [0,] -[timestamp] [9,] +[timestamp] [1,2,3,4,5,6,7,8,9,] ---------------------------------------------------------------------- ### File Store: diff --git a/index-scheduler/src/snapshots/lib.rs/test_document_addition_mixed_rights_with_index/2.snap b/index-scheduler/src/snapshots/lib.rs/test_document_addition_mixed_rights_with_index/2.snap index 914b22c0b..09e43e490 100644 --- a/index-scheduler/src/snapshots/lib.rs/test_document_addition_mixed_rights_with_index/2.snap +++ b/index-scheduler/src/snapshots/lib.rs/test_document_addition_mixed_rights_with_index/2.snap @@ -47,11 +47,11 @@ doggos [0,1,2,3,4,5,6,7,8,9,10,] ---------------------------------------------------------------------- ### Started At: [timestamp] [0,] -[timestamp] [10,] +[timestamp] [1,2,3,4,5,6,7,8,9,10,] ---------------------------------------------------------------------- ### Finished At: [timestamp] [0,] -[timestamp] [10,] +[timestamp] [1,2,3,4,5,6,7,8,9,10,] ---------------------------------------------------------------------- ### File Store: diff --git a/index-scheduler/src/snapshots/lib.rs/test_document_replace/2.snap b/index-scheduler/src/snapshots/lib.rs/test_document_replace/2.snap index 8eff0d704..06c8fb066 100644 --- a/index-scheduler/src/snapshots/lib.rs/test_document_replace/2.snap +++ b/index-scheduler/src/snapshots/lib.rs/test_document_replace/2.snap @@ -43,10 +43,10 @@ doggos [0,1,2,3,4,5,6,7,8,9,] [timestamp] [9,] ---------------------------------------------------------------------- ### Started At: -[timestamp] [9,] +[timestamp] [0,1,2,3,4,5,6,7,8,9,] ---------------------------------------------------------------------- ### Finished At: -[timestamp] [9,] +[timestamp] [0,1,2,3,4,5,6,7,8,9,] ---------------------------------------------------------------------- ### File Store: diff --git a/index-scheduler/src/snapshots/lib.rs/test_document_update/2.snap b/index-scheduler/src/snapshots/lib.rs/test_document_update/2.snap index 0a7697a91..68d640fea 100644 --- a/index-scheduler/src/snapshots/lib.rs/test_document_update/2.snap +++ b/index-scheduler/src/snapshots/lib.rs/test_document_update/2.snap @@ -43,10 +43,10 @@ doggos [0,1,2,3,4,5,6,7,8,9,] [timestamp] [9,] ---------------------------------------------------------------------- ### Started At: -[timestamp] [9,] +[timestamp] [0,1,2,3,4,5,6,7,8,9,] ---------------------------------------------------------------------- ### Finished At: -[timestamp] [9,] +[timestamp] [0,1,2,3,4,5,6,7,8,9,] ---------------------------------------------------------------------- ### File Store: diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index ba4cf3954..07a2161a4 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -175,7 +175,7 @@ pub(crate) fn insert_task_datetime( let timestamp = BEI128::new(time.unix_timestamp_nanos()); let mut task_ids = database.get(wtxn, ×tamp)?.unwrap_or_default(); task_ids.insert(task_id); - database.put(wtxn, ×tamp, &RoaringBitmap::from_iter([task_id]))?; + database.put(wtxn, ×tamp, &RoaringBitmap::from_iter(task_ids))?; Ok(()) } @@ -191,7 +191,7 @@ pub(crate) fn remove_task_datetime( if existing.is_empty() { database.delete(wtxn, ×tamp)?; } else { - database.put(wtxn, ×tamp, &RoaringBitmap::from_iter([task_id]))?; + database.put(wtxn, ×tamp, &RoaringBitmap::from_iter(existing))?; } } @@ -297,7 +297,7 @@ impl IndexScheduler { details, status, kind, - } = task; + } = task.clone(); assert_eq!(uid, task.uid); if let Some(task_index_uid) = &task_index_uid { assert!(self @@ -319,6 +319,9 @@ impl IndexScheduler { .get(&rtxn, &BEI128::new(started_at.unix_timestamp_nanos())) .unwrap() .unwrap(); + if !db_started_at.contains(task_id) { + dbg!(&task); + } assert!(db_started_at.contains(task_id)); } if let Some(finished_at) = finished_at {