diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 5a150ca93..35caea8ac 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -9,16 +9,19 @@ mod utils; use batch::Batch; pub use error::Error; use index::Index; -use milli::heed::types::{DecodeIgnore, OwnedType, SerdeBincode, Str}; pub use task::Task; use task::{Kind, KindWithContent, Status}; +use time::OffsetDateTime; +use update_file_store::UpdateFileStore; use std::collections::hash_map::Entry; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::{collections::HashMap, sync::RwLock}; +use milli::heed::types::{DecodeIgnore, OwnedType, SerdeBincode, Str}; use milli::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn}; +use milli::update::IndexDocumentsMethod; use milli::{RoaringBitmapCodec, BEU32}; use roaring::RoaringBitmap; use serde::Deserialize; @@ -54,6 +57,8 @@ pub struct IndexScheduler { /// The list of tasks currently processing. processing_tasks: Arc>, + file_store: UpdateFileStore, + /// The LMDB environment which the DBs are associated with. env: Env, @@ -326,7 +331,66 @@ impl IndexScheduler { Batch::Cancel(_) => todo!(), Batch::Snapshot(_) => todo!(), Batch::Dump(_) => todo!(), - Batch::Contiguous { tasks, kind } => todo!(), + Batch::Contiguous { tasks, kind } => { + // it's safe because you can't batch 0 contiguous tasks. + let first_task = &tasks[0]; + // and the two kind of tasks we batch MUST have ONE index name. + let index_name = first_task.indexes().unwrap()[0]; + let index = self.index(index_name)?; + + match kind { + Kind::DocumentAddition => { + let content_files = tasks.iter().map(|task| match &task.kind { + KindWithContent::DocumentAddition { content_file, .. } => { + content_file.clone() + } + k => unreachable!( + "Internal error, `{:?}` is not supposed to be reachable here", + k.as_kind() + ), + }); + let results = index.update_documents( + IndexDocumentsMethod::UpdateDocuments, + None, + self.file_store.clone(), + content_files, + )?; + + for (task, result) in tasks.iter_mut().zip(results) { + task.finished_at = Some(OffsetDateTime::now_utc()); + match result { + Ok(_) => task.status = Status::Succeeded, + Err(_) => task.status = Status::Succeeded, + } + } + } + Kind::DocumentDeletion => { + let ids: Vec<_> = tasks + .iter() + .flat_map(|task| match &task.kind { + KindWithContent::DocumentDeletion { documents_ids, .. } => { + documents_ids.clone() + } + k => unreachable!( + "Internal error, `{:?}` is not supposed to be reachable here", + k.as_kind() + ), + }) + .collect(); + + let result = index.delete_documents(&ids); + + for task in tasks.iter_mut() { + task.finished_at = Some(OffsetDateTime::now_utc()); + match result { + Ok(_) => task.status = Status::Succeeded, + Err(_) => task.status = Status::Succeeded, + } + } + } + _ => unreachable!(), + } + } Batch::Empty => todo!(), } diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index de14afcf6..3fdc1c2c3 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -2,6 +2,7 @@ use anyhow::Result; use serde::{Deserialize, Serialize}; use std::path::PathBuf; use time::OffsetDateTime; +use uuid::Uuid; use crate::TaskId; @@ -56,7 +57,7 @@ pub enum KindWithContent { Snapshot, DocumentAddition { index_name: String, - content_file: String, + content_file: Uuid, }, DocumentDeletion { index_name: String,