diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 612a31337..917ed9d55 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -138,6 +138,9 @@ pub struct IndexScheduler { /// Get a signal when a batch needs to be processed. pub(crate) wake_up: Arc, + /// Weither autobatching is enabled or not. + pub(crate) autobatching_enabled: bool, + // ================= test /// The next entry is dedicated to the tests. /// It provide a way to break in multiple part of the scheduler. @@ -161,6 +164,7 @@ impl IndexScheduler { indexes_path: PathBuf, index_size: usize, indexer_config: IndexerConfig, + autobatching_enabled: bool, #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender, ) -> Result { std::fs::create_dir_all(&tasks_path)?; @@ -187,6 +191,7 @@ impl IndexScheduler { env, // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things wake_up: Arc::new(SignalEvent::auto(true)), + autobatching_enabled, #[cfg(test)] test_breakpoint_sdr, @@ -208,6 +213,7 @@ impl IndexScheduler { index_tasks: self.index_tasks, index_mapper: self.index_mapper.clone(), wake_up: self.wake_up.clone(), + autobatching_enabled: self.autobatching_enabled, #[cfg(test)] test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), @@ -454,7 +460,7 @@ mod tests { use super::*; impl IndexScheduler { - pub fn test() -> (Self, IndexSchedulerHandle) { + pub fn test(autobatching: bool) -> (Self, IndexSchedulerHandle) { let tempdir = TempDir::new().unwrap(); let (sender, receiver) = crossbeam::channel::bounded(0); @@ -464,6 +470,7 @@ mod tests { tempdir.path().join("indexes"), 1024 * 1024, IndexerConfig::default(), + autobatching, // enable autobatching sender, ) .unwrap(); @@ -504,7 +511,7 @@ mod tests { #[test] fn register() { - let (index_scheduler, handle) = IndexScheduler::test(); + let (index_scheduler, handle) = IndexScheduler::test(true); handle.dont_block(); let kinds = [ @@ -583,7 +590,7 @@ mod tests { #[test] fn insert_task_while_another_task_is_processing() { - let (index_scheduler, handle) = IndexScheduler::test(); + let (index_scheduler, handle) = IndexScheduler::test(true); index_scheduler.register(KindWithContent::Snapshot).unwrap(); handle.wait_till(Breakpoint::BatchCreated); @@ -607,7 +614,7 @@ mod tests { /// we send them very fast, we must make sure that they are all processed. #[test] fn process_tasks_inserted_without_new_signal() { - let (index_scheduler, handle) = IndexScheduler::test(); + let (index_scheduler, handle) = IndexScheduler::test(true); index_scheduler .register(KindWithContent::IndexCreation { @@ -640,9 +647,50 @@ mod tests { assert_eq!(tasks[2].status, Status::Succeeded); } + #[test] + fn process_tasks_without_autobatching() { + let (index_scheduler, handle) = IndexScheduler::test(false); + + index_scheduler + .register(KindWithContent::IndexCreation { + index_uid: S("doggos"), + primary_key: None, + }) + .unwrap(); + index_scheduler + .register(KindWithContent::DocumentClear { + index_uid: S("doggos"), + }) + .unwrap(); + index_scheduler + .register(KindWithContent::DocumentClear { + index_uid: S("doggos"), + }) + .unwrap(); + index_scheduler + .register(KindWithContent::DocumentClear { + index_uid: S("doggos"), + }) + .unwrap(); + + handle.wait_till(Breakpoint::Start); + handle.wait_till(Breakpoint::AfterProcessing); + handle.wait_till(Breakpoint::AfterProcessing); + handle.wait_till(Breakpoint::AfterProcessing); + handle.wait_till(Breakpoint::AfterProcessing); + + let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap(); + tasks.reverse(); + assert_eq!(tasks.len(), 4); + assert_eq!(tasks[0].status, Status::Succeeded); + assert_eq!(tasks[1].status, Status::Succeeded); + assert_eq!(tasks[2].status, Status::Succeeded); + assert_eq!(tasks[3].status, Status::Succeeded); + } + #[test] fn document_addition() { - let (index_scheduler, handle) = IndexScheduler::test(); + let (index_scheduler, handle) = IndexScheduler::test(true); let content = r#" { @@ -753,6 +801,6 @@ mod tests { #[test] fn simple_new() { - crate::IndexScheduler::test(); + crate::IndexScheduler::test(true); } }