diff --git a/crates/dump/README.md b/crates/dump/README.md index 3537f188e..42d84ec80 100644 --- a/crates/dump/README.md +++ b/crates/dump/README.md @@ -10,8 +10,10 @@ dump ├── instance-uid.uuid ├── keys.jsonl ├── metadata.json -└── tasks - ├── update_files - │ └── [task_id].jsonl +├── tasks +│ ├── update_files +│ │ └── [task_id].jsonl +│ └── queue.jsonl +└── batches └── queue.jsonl -``` \ No newline at end of file +``` diff --git a/crates/dump/src/lib.rs b/crates/dump/src/lib.rs index ad2d96e1c..905a6485d 100644 --- a/crates/dump/src/lib.rs +++ b/crates/dump/src/lib.rs @@ -228,6 +228,7 @@ pub(crate) mod test { use big_s::S; use maplit::{btreemap, btreeset}; + use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchStats}; use meilisearch_types::facet_values_sort::FacetValuesSort; use meilisearch_types::features::{Network, Remote, RuntimeTogglableFeatures}; use meilisearch_types::index_uid_pattern::IndexUidPattern; @@ -235,7 +236,8 @@ pub(crate) mod test { use meilisearch_types::milli; use meilisearch_types::milli::update::Setting; use meilisearch_types::settings::{Checked, FacetingSettings, Settings}; - use meilisearch_types::tasks::{Details, Status}; + use meilisearch_types::task_view::DetailsView; + use meilisearch_types::tasks::{Details, Kind, Status}; use serde_json::{json, Map, Value}; use time::macros::datetime; use uuid::Uuid; @@ -305,6 +307,30 @@ pub(crate) mod test { settings.check() } + pub fn create_test_batches() -> Vec { + vec![Batch { + uid: 0, + details: DetailsView { + received_documents: Some(12), + indexed_documents: Some(Some(10)), + ..DetailsView::default() + }, + progress: None, + stats: BatchStats { + total_nb_tasks: 1, + status: maplit::btreemap! { Status::Succeeded => 1 }, + types: maplit::btreemap! { Kind::DocumentAdditionOrUpdate => 1 }, + index_uids: maplit::btreemap! { "doggo".to_string() => 1 }, + }, + enqueued_at: Some(BatchEnqueuedAt { + earliest: datetime!(2022-11-11 0:00 UTC), + oldest: datetime!(2022-11-11 0:00 UTC), + }), + started_at: datetime!(2022-11-20 0:00 UTC), + finished_at: Some(datetime!(2022-11-21 0:00 UTC)), + }] + } + pub fn create_test_tasks() -> Vec<(TaskDump, Option>)> { vec![ ( @@ -427,6 +453,15 @@ pub(crate) mod test { index.flush().unwrap(); index.settings(&settings).unwrap(); + // ========== pushing the batch queue + let batches = create_test_batches(); + + let mut batch_queue = dump.create_batches_queue().unwrap(); + for batch in &batches { + batch_queue.push_batch(batch).unwrap(); + } + batch_queue.flush().unwrap(); + // ========== pushing the task queue let tasks = create_test_tasks(); diff --git a/crates/dump/src/reader/mod.rs b/crates/dump/src/reader/mod.rs index ec74fa4fd..2b4440ab7 100644 --- a/crates/dump/src/reader/mod.rs +++ b/crates/dump/src/reader/mod.rs @@ -102,6 +102,13 @@ impl DumpReader { } } + pub fn batches(&mut self) -> Result> + '_>> { + match self { + DumpReader::Current(current) => Ok(current.batches()), + DumpReader::Compat(_compat) => Ok(Box::new(std::iter::empty())), + } + } + pub fn keys(&mut self) -> Result> + '_>> { match self { DumpReader::Current(current) => Ok(current.keys()), @@ -227,6 +234,10 @@ pub(crate) mod test { insta::assert_snapshot!(dump.date().unwrap(), @"2024-05-16 15:51:34.151044 +00:00:00"); insta::assert_debug_snapshot!(dump.instance_uid().unwrap(), @"None"); + // batches didn't exists at the time + let batches = dump.batches().unwrap().collect::>>().unwrap(); + meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]"); + // tasks let tasks = dump.tasks().unwrap().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); @@ -348,6 +359,10 @@ pub(crate) mod test { insta::assert_snapshot!(dump.date().unwrap(), @"2023-07-06 7:10:27.21958 +00:00:00"); insta::assert_debug_snapshot!(dump.instance_uid().unwrap(), @"None"); + // batches didn't exists at the time + let batches = dump.batches().unwrap().collect::>>().unwrap(); + meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]"); + // tasks let tasks = dump.tasks().unwrap().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); @@ -412,6 +427,10 @@ pub(crate) mod test { insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-04 15:55:10.344982459 +00:00:00"); insta::assert_snapshot!(dump.instance_uid().unwrap().unwrap(), @"9e15e977-f2ae-4761-943f-1eaf75fd736d"); + // batches didn't exists at the time + let batches = dump.batches().unwrap().collect::>>().unwrap(); + meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]"); + // tasks let tasks = dump.tasks().unwrap().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); @@ -492,6 +511,10 @@ pub(crate) mod test { insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-06 12:53:49.131989609 +00:00:00"); insta::assert_snapshot!(dump.instance_uid().unwrap().unwrap(), @"9e15e977-f2ae-4761-943f-1eaf75fd736d"); + // batches didn't exists at the time + let batches = dump.batches().unwrap().collect::>>().unwrap(); + meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]"); + // tasks let tasks = dump.tasks().unwrap().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); @@ -569,6 +592,10 @@ pub(crate) mod test { insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-07 11:39:03.709153554 +00:00:00"); assert_eq!(dump.instance_uid().unwrap(), None); + // batches didn't exists at the time + let batches = dump.batches().unwrap().collect::>>().unwrap(); + meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]"); + // tasks let tasks = dump.tasks().unwrap().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); @@ -662,6 +689,10 @@ pub(crate) mod test { insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-09 20:27:59.904096267 +00:00:00"); assert_eq!(dump.instance_uid().unwrap(), None); + // batches didn't exists at the time + let batches = dump.batches().unwrap().collect::>>().unwrap(); + meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]"); + // tasks let tasks = dump.tasks().unwrap().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); @@ -755,6 +786,10 @@ pub(crate) mod test { insta::assert_snapshot!(dump.date().unwrap(), @"2023-01-30 16:26:09.247261 +00:00:00"); assert_eq!(dump.instance_uid().unwrap(), None); + // batches didn't exists at the time + let batches = dump.batches().unwrap().collect::>>().unwrap(); + meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]"); + // tasks let tasks = dump.tasks().unwrap().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); @@ -831,6 +866,10 @@ pub(crate) mod test { assert_eq!(dump.date(), None); assert_eq!(dump.instance_uid().unwrap(), None); + // batches didn't exists at the time + let batches = dump.batches().unwrap().collect::>>().unwrap(); + meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]"); + // tasks let tasks = dump.tasks().unwrap().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); diff --git a/crates/dump/src/reader/v6/mod.rs b/crates/dump/src/reader/v6/mod.rs index 4c05f16bf..9e0d07c78 100644 --- a/crates/dump/src/reader/v6/mod.rs +++ b/crates/dump/src/reader/v6/mod.rs @@ -18,6 +18,7 @@ pub type Checked = meilisearch_types::settings::Checked; pub type Unchecked = meilisearch_types::settings::Unchecked; pub type Task = crate::TaskDump; +pub type Batch = meilisearch_types::batches::Batch; pub type Key = meilisearch_types::keys::Key; pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures; pub type Network = meilisearch_types::features::Network; @@ -49,6 +50,7 @@ pub struct V6Reader { instance_uid: Option, metadata: Metadata, tasks: BufReader, + batches: Option>, keys: BufReader, features: Option, network: Option, @@ -79,6 +81,12 @@ impl V6Reader { } else { None }; + let batches = match File::open(dump.path().join("batches").join("queue.jsonl")) { + Ok(file) => Some(BufReader::new(file)), + // The batch file was only introduced during the v1.13, anything prior to that won't have batches + Err(err) if err.kind() == ErrorKind::NotFound => None, + Err(e) => return Err(e.into()), + }; let network_file = match fs::read(dump.path().join("network.json")) { Ok(network_file) => Some(network_file), @@ -101,6 +109,7 @@ impl V6Reader { metadata: serde_json::from_reader(&*meta_file)?, instance_uid, tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?), + batches, keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?), features, network, @@ -144,7 +153,7 @@ impl V6Reader { &mut self, ) -> Box>)>> + '_> { Box::new((&mut self.tasks).lines().map(|line| -> Result<_> { - let task: Task = serde_json::from_str(&line?).unwrap(); + let task: Task = serde_json::from_str(&line?)?; let update_file_path = self .dump @@ -156,8 +165,7 @@ impl V6Reader { if update_file_path.exists() { Ok(( task, - Some(Box::new(UpdateFile::new(&update_file_path).unwrap()) - as Box), + Some(Box::new(UpdateFile::new(&update_file_path)?) as Box), )) } else { Ok((task, None)) @@ -165,6 +173,16 @@ impl V6Reader { })) } + pub fn batches(&mut self) -> Box> + '_> { + match self.batches.as_mut() { + Some(batches) => Box::new((batches).lines().map(|line| -> Result<_> { + let batch = serde_json::from_str(&line?)?; + Ok(batch) + })), + None => Box::new(std::iter::empty()) as Box> + '_>, + } + } + pub fn keys(&mut self) -> Box> + '_> { Box::new( (&mut self.keys).lines().map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }), diff --git a/crates/dump/src/writer.rs b/crates/dump/src/writer.rs index 923147c63..bfe091ab5 100644 --- a/crates/dump/src/writer.rs +++ b/crates/dump/src/writer.rs @@ -4,6 +4,7 @@ use std::path::PathBuf; use flate2::write::GzEncoder; use flate2::Compression; +use meilisearch_types::batches::Batch; use meilisearch_types::features::{Network, RuntimeTogglableFeatures}; use meilisearch_types::keys::Key; use meilisearch_types::settings::{Checked, Settings}; @@ -54,6 +55,10 @@ impl DumpWriter { TaskWriter::new(self.dir.path().join("tasks")) } + pub fn create_batches_queue(&self) -> Result { + BatchWriter::new(self.dir.path().join("batches")) + } + pub fn create_experimental_features(&self, features: RuntimeTogglableFeatures) -> Result<()> { Ok(std::fs::write( self.dir.path().join("experimental-features.json"), @@ -130,6 +135,30 @@ impl TaskWriter { } } +pub struct BatchWriter { + queue: BufWriter, +} + +impl BatchWriter { + pub(crate) fn new(path: PathBuf) -> Result { + std::fs::create_dir(&path)?; + let queue = File::create(path.join("queue.jsonl"))?; + Ok(BatchWriter { queue: BufWriter::new(queue) }) + } + + /// Pushes batches in the dump. + pub fn push_batch(&mut self, batch: &Batch) -> Result<()> { + self.queue.write_all(&serde_json::to_vec(batch)?)?; + self.queue.write_all(b"\n")?; + Ok(()) + } + + pub fn flush(mut self) -> Result<()> { + self.queue.flush()?; + Ok(()) + } +} + pub struct UpdateFile { path: PathBuf, writer: Option>, @@ -209,8 +238,8 @@ pub(crate) mod test { use super::*; use crate::reader::Document; use crate::test::{ - create_test_api_keys, create_test_documents, create_test_dump, create_test_instance_uid, - create_test_settings, create_test_tasks, + create_test_api_keys, create_test_batches, create_test_documents, create_test_dump, + create_test_instance_uid, create_test_settings, create_test_tasks, }; fn create_directory_hierarchy(dir: &Path) -> String { @@ -285,8 +314,10 @@ pub(crate) mod test { let dump_path = dump.path(); // ==== checking global file hierarchy (we want to be sure there isn't too many files or too few) - insta::assert_snapshot!(create_directory_hierarchy(dump_path), @r###" + insta::assert_snapshot!(create_directory_hierarchy(dump_path), @r" . + ├---- batches/ + │ └---- queue.jsonl ├---- indexes/ │ └---- doggos/ │ │ ├---- documents.jsonl @@ -301,7 +332,7 @@ pub(crate) mod test { ├---- keys.jsonl ├---- metadata.json └---- network.json - "###); + "); // ==== checking the top level infos let metadata = fs::read_to_string(dump_path.join("metadata.json")).unwrap(); @@ -354,6 +385,16 @@ pub(crate) mod test { } } + // ==== checking the batch queue + let batches_queue = fs::read_to_string(dump_path.join("batches/queue.jsonl")).unwrap(); + for (batch, expected) in batches_queue.lines().zip(create_test_batches()) { + let mut batch = serde_json::from_str::(batch).unwrap(); + if batch.details.settings == Some(Box::new(Settings::::default())) { + batch.details.settings = None; + } + assert_eq!(batch, expected, "{batch:#?}{expected:#?}"); + } + // ==== checking the keys let keys = fs::read_to_string(dump_path.join("keys.jsonl")).unwrap(); for (key, expected) in keys.lines().zip(create_test_api_keys()) { diff --git a/crates/index-scheduler/src/dump.rs b/crates/index-scheduler/src/dump.rs index 7e0341fcb..ca26e50c8 100644 --- a/crates/index-scheduler/src/dump.rs +++ b/crates/index-scheduler/src/dump.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::io; use dump::{KindDump, TaskDump, UpdateFile}; +use meilisearch_types::batches::{Batch, BatchId}; use meilisearch_types::heed::RwTxn; use meilisearch_types::milli; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; @@ -14,9 +15,15 @@ pub struct Dump<'a> { index_scheduler: &'a IndexScheduler, wtxn: RwTxn<'a>, + batch_to_task_mapping: HashMap, + indexes: HashMap, statuses: HashMap, kinds: HashMap, + + batch_indexes: HashMap, + batch_statuses: HashMap, + batch_kinds: HashMap, } impl<'a> Dump<'a> { @@ -27,12 +34,72 @@ impl<'a> Dump<'a> { Ok(Dump { index_scheduler, wtxn, + batch_to_task_mapping: HashMap::new(), indexes: HashMap::new(), statuses: HashMap::new(), kinds: HashMap::new(), + batch_indexes: HashMap::new(), + batch_statuses: HashMap::new(), + batch_kinds: HashMap::new(), }) } + /// Register a new batch coming from a dump in the scheduler. + /// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running. + pub fn register_dumped_batch(&mut self, batch: Batch) -> Result<()> { + self.index_scheduler.queue.batches.all_batches.put(&mut self.wtxn, &batch.uid, &batch)?; + if let Some(enqueued_at) = batch.enqueued_at { + utils::insert_task_datetime( + &mut self.wtxn, + self.index_scheduler.queue.batches.enqueued_at, + enqueued_at.earliest, + batch.uid, + )?; + utils::insert_task_datetime( + &mut self.wtxn, + self.index_scheduler.queue.batches.enqueued_at, + enqueued_at.oldest, + batch.uid, + )?; + } + utils::insert_task_datetime( + &mut self.wtxn, + self.index_scheduler.queue.batches.started_at, + batch.started_at, + batch.uid, + )?; + if let Some(finished_at) = batch.finished_at { + utils::insert_task_datetime( + &mut self.wtxn, + self.index_scheduler.queue.batches.finished_at, + finished_at, + batch.uid, + )?; + } + + for index in batch.stats.index_uids.keys() { + match self.batch_indexes.get_mut(index) { + Some(bitmap) => { + bitmap.insert(batch.uid); + } + None => { + let mut bitmap = RoaringBitmap::new(); + bitmap.insert(batch.uid); + self.batch_indexes.insert(index.to_string(), bitmap); + } + }; + } + + for status in batch.stats.status.keys() { + self.batch_statuses.entry(*status).or_default().insert(batch.uid); + } + for kind in batch.stats.types.keys() { + self.batch_kinds.entry(*kind).or_default().insert(batch.uid); + } + + Ok(()) + } + /// Register a new task coming from a dump in the scheduler. /// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running. pub fn register_dumped_task( @@ -149,6 +216,9 @@ impl<'a> Dump<'a> { }; self.index_scheduler.queue.tasks.all_tasks.put(&mut self.wtxn, &task.uid, &task)?; + if let Some(batch_id) = task.batch_uid { + self.batch_to_task_mapping.entry(batch_id).or_default().insert(task.uid); + } for index in task.indexes() { match self.indexes.get_mut(index) { @@ -198,6 +268,14 @@ impl<'a> Dump<'a> { /// Commit all the changes and exit the importing dump state pub fn finish(mut self) -> Result<()> { + for (batch_id, task_ids) in self.batch_to_task_mapping { + self.index_scheduler.queue.batch_to_tasks_mapping.put( + &mut self.wtxn, + &batch_id, + &task_ids, + )?; + } + for (index, bitmap) in self.indexes { self.index_scheduler.queue.tasks.index_tasks.put(&mut self.wtxn, &index, &bitmap)?; } @@ -208,6 +286,16 @@ impl<'a> Dump<'a> { self.index_scheduler.queue.tasks.put_kind(&mut self.wtxn, kind, &bitmap)?; } + for (index, bitmap) in self.batch_indexes { + self.index_scheduler.queue.batches.index_tasks.put(&mut self.wtxn, &index, &bitmap)?; + } + for (status, bitmap) in self.batch_statuses { + self.index_scheduler.queue.batches.put_status(&mut self.wtxn, status, &bitmap)?; + } + for (kind, bitmap) in self.batch_kinds { + self.index_scheduler.queue.batches.put_kind(&mut self.wtxn, kind, &bitmap)?; + } + self.wtxn.commit()?; self.index_scheduler.scheduler.wake_up.signal(); diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index 58f01c770..fed26aeb7 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -96,6 +96,7 @@ make_enum_progress! { StartTheDumpCreation, DumpTheApiKeys, DumpTheTasks, + DumpTheBatches, DumpTheIndexes, DumpTheExperimentalFeatures, CompressTheDump, diff --git a/crates/index-scheduler/src/scheduler/process_dump_creation.rs b/crates/index-scheduler/src/scheduler/process_dump_creation.rs index adf5a5b61..4a1aef44a 100644 --- a/crates/index-scheduler/src/scheduler/process_dump_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_dump_creation.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeMap; use std::fs::File; use std::io::BufWriter; use std::sync::atomic::Ordering; @@ -11,7 +12,9 @@ use meilisearch_types::tasks::{Details, KindWithContent, Status, Task}; use time::macros::format_description; use time::OffsetDateTime; -use crate::processing::{AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress}; +use crate::processing::{ + AtomicBatchStep, AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress, +}; use crate::{Error, IndexScheduler, Result}; impl IndexScheduler { @@ -102,7 +105,40 @@ impl IndexScheduler { } dump_tasks.flush()?; - // 3. Dump the indexes + // 3. dump the batches + progress.update_progress(DumpCreationProgress::DumpTheBatches); + let mut dump_batches = dump.create_batches_queue()?; + + let (atomic, update_batch_progress) = + AtomicBatchStep::new(self.queue.batches.all_batches.len(&rtxn)? as u32); + progress.update_progress(update_batch_progress); + + for ret in self.queue.batches.all_batches.iter(&rtxn)? { + if self.scheduler.must_stop_processing.get() { + return Err(Error::AbortedTask); + } + + let (_, mut b) = ret?; + // In the case we're dumping ourselves we want to be marked as finished + // to not loop over ourselves indefinitely. + if b.uid == task.uid { + let finished_at = OffsetDateTime::now_utc(); + + // We're going to fake the date because we don't know if everything is going to go well. + // But we need to dump the task as finished and successful. + // If something fail everything will be set appropriately in the end. + let mut statuses = BTreeMap::new(); + statuses.insert(Status::Succeeded, b.stats.total_nb_tasks); + b.stats.status = statuses; + b.finished_at = Some(finished_at); + } + + dump_batches.push_batch(&b)?; + atomic.fetch_add(1, Ordering::Relaxed); + } + dump_batches.flush()?; + + // 4. Dump the indexes progress.update_progress(DumpCreationProgress::DumpTheIndexes); let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32; let mut count = 0; @@ -142,7 +178,7 @@ impl IndexScheduler { let documents = index .all_documents(&rtxn) .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; - // 3.1. Dump the documents + // 4.1. Dump the documents for ret in documents { if self.scheduler.must_stop_processing.get() { return Err(Error::AbortedTask); @@ -204,7 +240,7 @@ impl IndexScheduler { atomic.fetch_add(1, Ordering::Relaxed); } - // 3.2. Dump the settings + // 4.2. Dump the settings let settings = meilisearch_types::settings::settings( index, &rtxn, @@ -215,7 +251,7 @@ impl IndexScheduler { Ok(()) })?; - // 4. Dump experimental feature settings + // 5. Dump experimental feature settings progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures); let features = self.features().runtime_features(); dump.create_experimental_features(features)?; diff --git a/crates/meilisearch-types/src/batches.rs b/crates/meilisearch-types/src/batches.rs index 462d314db..663f5cb8d 100644 --- a/crates/meilisearch-types/src/batches.rs +++ b/crates/meilisearch-types/src/batches.rs @@ -30,7 +30,21 @@ pub struct Batch { pub enqueued_at: Option, } -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +impl PartialEq for Batch { + fn eq(&self, other: &Self) -> bool { + let Self { uid, progress, details, stats, started_at, finished_at, enqueued_at } = self; + + *uid == other.uid + && progress.is_none() == other.progress.is_none() + && details == &other.details + && stats == &other.stats + && started_at == &other.started_at + && finished_at == &other.finished_at + && enqueued_at == &other.enqueued_at + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct BatchEnqueuedAt { #[serde(with = "time::serde::rfc3339")] pub earliest: OffsetDateTime, @@ -38,7 +52,7 @@ pub struct BatchEnqueuedAt { pub oldest: OffsetDateTime, } -#[derive(Default, Debug, Clone, Serialize, Deserialize, ToSchema)] +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] #[schema(rename_all = "camelCase")] pub struct BatchStats { diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index cbd299f26..9f7baba18 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -511,9 +511,15 @@ fn import_dump( index_scheduler.refresh_index_stats(&uid)?; } + // 5. Import the queue let mut index_scheduler_dump = index_scheduler.register_dumped_task()?; + // 5.1. Import the batches + for ret in dump_reader.batches()? { + let batch = ret?; + index_scheduler_dump.register_dumped_batch(batch)?; + } - // 5. Import the tasks. + // 5.2. Import the tasks for ret in dump_reader.tasks()? { let (task, file) = ret?; index_scheduler_dump.register_dumped_task(task, file)?; diff --git a/crates/meilitool/src/main.rs b/crates/meilitool/src/main.rs index da0f9cbeb..a5bf8ecdb 100644 --- a/crates/meilitool/src/main.rs +++ b/crates/meilitool/src/main.rs @@ -8,6 +8,7 @@ use clap::{Parser, Subcommand}; use dump::{DumpWriter, IndexMetadata}; use file_store::FileStore; use meilisearch_auth::AuthController; +use meilisearch_types::batches::Batch; use meilisearch_types::heed::types::{SerdeJson, Str}; use meilisearch_types::heed::{ CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified, @@ -255,70 +256,86 @@ fn export_a_dump( eprintln!("Successfully dumped {count} keys!"); + eprintln!("Dumping the queue"); let rtxn = env.read_txn()?; let all_tasks: Database> = try_opening_database(&env, &rtxn, "all-tasks")?; + let all_batches: Database> = + try_opening_database(&env, &rtxn, "all-batches")?; let index_mapping: Database = try_opening_database(&env, &rtxn, "index-mapping")?; - if skip_enqueued_tasks { - eprintln!("Skip dumping the enqueued tasks..."); - } else { - let mut dump_tasks = dump.create_tasks_queue()?; - let mut count = 0; - for ret in all_tasks.iter(&rtxn)? { - let (_, t) = ret?; - let status = t.status; - let content_file = t.content_uuid(); + eprintln!("Dumping the tasks"); + let mut dump_tasks = dump.create_tasks_queue()?; + let mut count_tasks = 0; + let mut count_enqueued_tasks = 0; + for ret in all_tasks.iter(&rtxn)? { + let (_, t) = ret?; + let status = t.status; + let content_file = t.content_uuid(); - let mut dump_content_file = dump_tasks.push_task(&t.into())?; + if status == Status::Enqueued && skip_enqueued_tasks { + continue; + } - // 3.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. - if let Some(content_file_uuid) = content_file { - if status == Status::Enqueued { - let content_file = file_store.get_update(content_file_uuid)?; + let mut dump_content_file = dump_tasks.push_task(&t.into())?; - if (detected_version.0, detected_version.1, detected_version.2) < (1, 12, 0) { - eprintln!("Dumping the enqueued tasks reading them in obkv format..."); - let reader = - DocumentsBatchReader::from_reader(content_file).with_context(|| { - format!("While reading content file {:?}", content_file_uuid) - })?; - let (mut cursor, documents_batch_index) = - reader.into_cursor_and_fields_index(); - while let Some(doc) = cursor.next_document().with_context(|| { - format!("While iterating on content file {:?}", content_file_uuid) - })? { - dump_content_file - .push_document(&obkv_to_object(doc, &documents_batch_index)?)?; - } - } else { - eprintln!( - "Dumping the enqueued tasks reading them in JSON stream format..." - ); - for document in - serde_json::de::Deserializer::from_reader(content_file).into_iter() - { - let document = document.with_context(|| { - format!("While reading content file {:?}", content_file_uuid) - })?; - dump_content_file.push_document(&document)?; - } + // 3.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. + if let Some(content_file_uuid) = content_file { + if status == Status::Enqueued { + let content_file = file_store.get_update(content_file_uuid)?; + + if (detected_version.0, detected_version.1, detected_version.2) < (1, 12, 0) { + eprintln!("Dumping the enqueued tasks reading them in obkv format..."); + let reader = + DocumentsBatchReader::from_reader(content_file).with_context(|| { + format!("While reading content file {:?}", content_file_uuid) + })?; + let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index(); + while let Some(doc) = cursor.next_document().with_context(|| { + format!("While iterating on content file {:?}", content_file_uuid) + })? { + dump_content_file + .push_document(&obkv_to_object(doc, &documents_batch_index)?)?; + } + } else { + eprintln!("Dumping the enqueued tasks reading them in JSON stream format..."); + for document in + serde_json::de::Deserializer::from_reader(content_file).into_iter() + { + let document = document.with_context(|| { + format!("While reading content file {:?}", content_file_uuid) + })?; + dump_content_file.push_document(&document)?; } - - dump_content_file.flush()?; - count += 1; } + + dump_content_file.flush()?; + count_enqueued_tasks += 1; } } - dump_tasks.flush()?; - - eprintln!("Successfully dumped {count} enqueued tasks!"); + count_tasks += 1; } + dump_tasks.flush()?; + eprintln!( + "Successfully dumped {count_tasks} tasks including {count_enqueued_tasks} enqueued tasks!" + ); + // 4. dump the batches + eprintln!("Dumping the batches"); + let mut dump_batches = dump.create_batches_queue()?; + let mut count = 0; + + for ret in all_batches.iter(&rtxn)? { + let (_, b) = ret?; + dump_batches.push_batch(&b)?; + count += 1; + } + dump_batches.flush()?; + eprintln!("Successfully dumped {count} batches!"); + + // 5. Dump the indexes eprintln!("Dumping the indexes..."); - - // 4. Dump the indexes let mut count = 0; for result in index_mapping.iter(&rtxn)? { let (uid, uuid) = result?; @@ -339,14 +356,14 @@ fn export_a_dump( let fields_ids_map = index.fields_ids_map(&rtxn)?; let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); - // 4.1. Dump the documents + // 5.1. Dump the documents for ret in index.all_documents(&rtxn)? { let (_id, doc) = ret?; let document = obkv_to_json(&all_fields, &fields_ids_map, doc)?; index_dumper.push_document(&document)?; } - // 4.2. Dump the settings + // 5.2. Dump the settings let settings = meilisearch_types::settings::settings( &index, &rtxn,