fix the missing batch in the dumps in meilisearch and meilitools

This commit is contained in:
Tamo 2025-02-04 11:13:29 +01:00
parent 78867b6852
commit 73e8900a9b
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
11 changed files with 367 additions and 70 deletions

View File

@ -10,8 +10,10 @@ dump
├── instance-uid.uuid ├── instance-uid.uuid
├── keys.jsonl ├── keys.jsonl
├── metadata.json ├── metadata.json
└── tasks ├── tasks
├── update_files │ ├── update_files
│ └── [task_id].jsonl │ │ └── [task_id].jsonl
│ └── queue.jsonl
└── batches
└── queue.jsonl └── queue.jsonl
``` ```

View File

@ -228,6 +228,7 @@ pub(crate) mod test {
use big_s::S; use big_s::S;
use maplit::{btreemap, btreeset}; use maplit::{btreemap, btreeset};
use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchStats};
use meilisearch_types::facet_values_sort::FacetValuesSort; use meilisearch_types::facet_values_sort::FacetValuesSort;
use meilisearch_types::features::{Network, Remote, RuntimeTogglableFeatures}; use meilisearch_types::features::{Network, Remote, RuntimeTogglableFeatures};
use meilisearch_types::index_uid_pattern::IndexUidPattern; use meilisearch_types::index_uid_pattern::IndexUidPattern;
@ -235,7 +236,8 @@ pub(crate) mod test {
use meilisearch_types::milli; use meilisearch_types::milli;
use meilisearch_types::milli::update::Setting; use meilisearch_types::milli::update::Setting;
use meilisearch_types::settings::{Checked, FacetingSettings, Settings}; use meilisearch_types::settings::{Checked, FacetingSettings, Settings};
use meilisearch_types::tasks::{Details, Status}; use meilisearch_types::task_view::DetailsView;
use meilisearch_types::tasks::{Details, Kind, Status};
use serde_json::{json, Map, Value}; use serde_json::{json, Map, Value};
use time::macros::datetime; use time::macros::datetime;
use uuid::Uuid; use uuid::Uuid;
@ -305,6 +307,30 @@ pub(crate) mod test {
settings.check() settings.check()
} }
pub fn create_test_batches() -> Vec<Batch> {
vec![Batch {
uid: 0,
details: DetailsView {
received_documents: Some(12),
indexed_documents: Some(Some(10)),
..DetailsView::default()
},
progress: None,
stats: BatchStats {
total_nb_tasks: 1,
status: maplit::btreemap! { Status::Succeeded => 1 },
types: maplit::btreemap! { Kind::DocumentAdditionOrUpdate => 1 },
index_uids: maplit::btreemap! { "doggo".to_string() => 1 },
},
enqueued_at: Some(BatchEnqueuedAt {
earliest: datetime!(2022-11-11 0:00 UTC),
oldest: datetime!(2022-11-11 0:00 UTC),
}),
started_at: datetime!(2022-11-20 0:00 UTC),
finished_at: Some(datetime!(2022-11-21 0:00 UTC)),
}]
}
pub fn create_test_tasks() -> Vec<(TaskDump, Option<Vec<Document>>)> { pub fn create_test_tasks() -> Vec<(TaskDump, Option<Vec<Document>>)> {
vec![ vec![
( (
@ -427,6 +453,15 @@ pub(crate) mod test {
index.flush().unwrap(); index.flush().unwrap();
index.settings(&settings).unwrap(); index.settings(&settings).unwrap();
// ========== pushing the batch queue
let batches = create_test_batches();
let mut batch_queue = dump.create_batches_queue().unwrap();
for batch in &batches {
batch_queue.push_batch(batch).unwrap();
}
batch_queue.flush().unwrap();
// ========== pushing the task queue // ========== pushing the task queue
let tasks = create_test_tasks(); let tasks = create_test_tasks();

View File

@ -102,6 +102,13 @@ impl DumpReader {
} }
} }
pub fn batches(&mut self) -> Result<Box<dyn Iterator<Item = Result<v6::Batch>> + '_>> {
match self {
DumpReader::Current(current) => Ok(current.batches()),
DumpReader::Compat(_compat) => Ok(Box::new(std::iter::empty())),
}
}
pub fn keys(&mut self) -> Result<Box<dyn Iterator<Item = Result<v6::Key>> + '_>> { pub fn keys(&mut self) -> Result<Box<dyn Iterator<Item = Result<v6::Key>> + '_>> {
match self { match self {
DumpReader::Current(current) => Ok(current.keys()), DumpReader::Current(current) => Ok(current.keys()),
@ -227,6 +234,10 @@ pub(crate) mod test {
insta::assert_snapshot!(dump.date().unwrap(), @"2024-05-16 15:51:34.151044 +00:00:00"); insta::assert_snapshot!(dump.date().unwrap(), @"2024-05-16 15:51:34.151044 +00:00:00");
insta::assert_debug_snapshot!(dump.instance_uid().unwrap(), @"None"); insta::assert_debug_snapshot!(dump.instance_uid().unwrap(), @"None");
// batches didn't exists at the time
let batches = dump.batches().unwrap().collect::<Result<Vec<_>>>().unwrap();
meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]");
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
@ -348,6 +359,10 @@ pub(crate) mod test {
insta::assert_snapshot!(dump.date().unwrap(), @"2023-07-06 7:10:27.21958 +00:00:00"); insta::assert_snapshot!(dump.date().unwrap(), @"2023-07-06 7:10:27.21958 +00:00:00");
insta::assert_debug_snapshot!(dump.instance_uid().unwrap(), @"None"); insta::assert_debug_snapshot!(dump.instance_uid().unwrap(), @"None");
// batches didn't exists at the time
let batches = dump.batches().unwrap().collect::<Result<Vec<_>>>().unwrap();
meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]");
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
@ -412,6 +427,10 @@ pub(crate) mod test {
insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-04 15:55:10.344982459 +00:00:00"); insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-04 15:55:10.344982459 +00:00:00");
insta::assert_snapshot!(dump.instance_uid().unwrap().unwrap(), @"9e15e977-f2ae-4761-943f-1eaf75fd736d"); insta::assert_snapshot!(dump.instance_uid().unwrap().unwrap(), @"9e15e977-f2ae-4761-943f-1eaf75fd736d");
// batches didn't exists at the time
let batches = dump.batches().unwrap().collect::<Result<Vec<_>>>().unwrap();
meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]");
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
@ -492,6 +511,10 @@ pub(crate) mod test {
insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-06 12:53:49.131989609 +00:00:00"); insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-06 12:53:49.131989609 +00:00:00");
insta::assert_snapshot!(dump.instance_uid().unwrap().unwrap(), @"9e15e977-f2ae-4761-943f-1eaf75fd736d"); insta::assert_snapshot!(dump.instance_uid().unwrap().unwrap(), @"9e15e977-f2ae-4761-943f-1eaf75fd736d");
// batches didn't exists at the time
let batches = dump.batches().unwrap().collect::<Result<Vec<_>>>().unwrap();
meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]");
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
@ -569,6 +592,10 @@ pub(crate) mod test {
insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-07 11:39:03.709153554 +00:00:00"); insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-07 11:39:03.709153554 +00:00:00");
assert_eq!(dump.instance_uid().unwrap(), None); assert_eq!(dump.instance_uid().unwrap(), None);
// batches didn't exists at the time
let batches = dump.batches().unwrap().collect::<Result<Vec<_>>>().unwrap();
meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]");
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
@ -662,6 +689,10 @@ pub(crate) mod test {
insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-09 20:27:59.904096267 +00:00:00"); insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-09 20:27:59.904096267 +00:00:00");
assert_eq!(dump.instance_uid().unwrap(), None); assert_eq!(dump.instance_uid().unwrap(), None);
// batches didn't exists at the time
let batches = dump.batches().unwrap().collect::<Result<Vec<_>>>().unwrap();
meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]");
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
@ -755,6 +786,10 @@ pub(crate) mod test {
insta::assert_snapshot!(dump.date().unwrap(), @"2023-01-30 16:26:09.247261 +00:00:00"); insta::assert_snapshot!(dump.date().unwrap(), @"2023-01-30 16:26:09.247261 +00:00:00");
assert_eq!(dump.instance_uid().unwrap(), None); assert_eq!(dump.instance_uid().unwrap(), None);
// batches didn't exists at the time
let batches = dump.batches().unwrap().collect::<Result<Vec<_>>>().unwrap();
meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]");
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
@ -831,6 +866,10 @@ pub(crate) mod test {
assert_eq!(dump.date(), None); assert_eq!(dump.date(), None);
assert_eq!(dump.instance_uid().unwrap(), None); assert_eq!(dump.instance_uid().unwrap(), None);
// batches didn't exists at the time
let batches = dump.batches().unwrap().collect::<Result<Vec<_>>>().unwrap();
meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]");
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();

View File

@ -18,6 +18,7 @@ pub type Checked = meilisearch_types::settings::Checked;
pub type Unchecked = meilisearch_types::settings::Unchecked; pub type Unchecked = meilisearch_types::settings::Unchecked;
pub type Task = crate::TaskDump; pub type Task = crate::TaskDump;
pub type Batch = meilisearch_types::batches::Batch;
pub type Key = meilisearch_types::keys::Key; pub type Key = meilisearch_types::keys::Key;
pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures; pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures;
pub type Network = meilisearch_types::features::Network; pub type Network = meilisearch_types::features::Network;
@ -49,6 +50,7 @@ pub struct V6Reader {
instance_uid: Option<Uuid>, instance_uid: Option<Uuid>,
metadata: Metadata, metadata: Metadata,
tasks: BufReader<File>, tasks: BufReader<File>,
batches: Option<BufReader<File>>,
keys: BufReader<File>, keys: BufReader<File>,
features: Option<RuntimeTogglableFeatures>, features: Option<RuntimeTogglableFeatures>,
network: Option<Network>, network: Option<Network>,
@ -79,6 +81,12 @@ impl V6Reader {
} else { } else {
None None
}; };
let batches = match File::open(dump.path().join("batches").join("queue.jsonl")) {
Ok(file) => Some(BufReader::new(file)),
// The batch file was only introduced during the v1.13, anything prior to that won't have batches
Err(err) if err.kind() == ErrorKind::NotFound => None,
Err(e) => return Err(e.into()),
};
let network_file = match fs::read(dump.path().join("network.json")) { let network_file = match fs::read(dump.path().join("network.json")) {
Ok(network_file) => Some(network_file), Ok(network_file) => Some(network_file),
@ -101,6 +109,7 @@ impl V6Reader {
metadata: serde_json::from_reader(&*meta_file)?, metadata: serde_json::from_reader(&*meta_file)?,
instance_uid, instance_uid,
tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?), tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?),
batches,
keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?), keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?),
features, features,
network, network,
@ -144,7 +153,7 @@ impl V6Reader {
&mut self, &mut self,
) -> Box<dyn Iterator<Item = Result<(Task, Option<Box<super::UpdateFile>>)>> + '_> { ) -> Box<dyn Iterator<Item = Result<(Task, Option<Box<super::UpdateFile>>)>> + '_> {
Box::new((&mut self.tasks).lines().map(|line| -> Result<_> { Box::new((&mut self.tasks).lines().map(|line| -> Result<_> {
let task: Task = serde_json::from_str(&line?).unwrap(); let task: Task = serde_json::from_str(&line?)?;
let update_file_path = self let update_file_path = self
.dump .dump
@ -156,8 +165,7 @@ impl V6Reader {
if update_file_path.exists() { if update_file_path.exists() {
Ok(( Ok((
task, task,
Some(Box::new(UpdateFile::new(&update_file_path).unwrap()) Some(Box::new(UpdateFile::new(&update_file_path)?) as Box<super::UpdateFile>),
as Box<super::UpdateFile>),
)) ))
} else { } else {
Ok((task, None)) Ok((task, None))
@ -165,6 +173,16 @@ impl V6Reader {
})) }))
} }
pub fn batches(&mut self) -> Box<dyn Iterator<Item = Result<Batch>> + '_> {
match self.batches.as_mut() {
Some(batches) => Box::new((batches).lines().map(|line| -> Result<_> {
let batch = serde_json::from_str(&line?)?;
Ok(batch)
})),
None => Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Result<Batch>> + '_>,
}
}
pub fn keys(&mut self) -> Box<dyn Iterator<Item = Result<Key>> + '_> { pub fn keys(&mut self) -> Box<dyn Iterator<Item = Result<Key>> + '_> {
Box::new( Box::new(
(&mut self.keys).lines().map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }), (&mut self.keys).lines().map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }),

View File

@ -4,6 +4,7 @@ use std::path::PathBuf;
use flate2::write::GzEncoder; use flate2::write::GzEncoder;
use flate2::Compression; use flate2::Compression;
use meilisearch_types::batches::Batch;
use meilisearch_types::features::{Network, RuntimeTogglableFeatures}; use meilisearch_types::features::{Network, RuntimeTogglableFeatures};
use meilisearch_types::keys::Key; use meilisearch_types::keys::Key;
use meilisearch_types::settings::{Checked, Settings}; use meilisearch_types::settings::{Checked, Settings};
@ -54,6 +55,10 @@ impl DumpWriter {
TaskWriter::new(self.dir.path().join("tasks")) TaskWriter::new(self.dir.path().join("tasks"))
} }
pub fn create_batches_queue(&self) -> Result<BatchWriter> {
BatchWriter::new(self.dir.path().join("batches"))
}
pub fn create_experimental_features(&self, features: RuntimeTogglableFeatures) -> Result<()> { pub fn create_experimental_features(&self, features: RuntimeTogglableFeatures) -> Result<()> {
Ok(std::fs::write( Ok(std::fs::write(
self.dir.path().join("experimental-features.json"), self.dir.path().join("experimental-features.json"),
@ -130,6 +135,30 @@ impl TaskWriter {
} }
} }
pub struct BatchWriter {
queue: BufWriter<File>,
}
impl BatchWriter {
pub(crate) fn new(path: PathBuf) -> Result<Self> {
std::fs::create_dir(&path)?;
let queue = File::create(path.join("queue.jsonl"))?;
Ok(BatchWriter { queue: BufWriter::new(queue) })
}
/// Pushes batches in the dump.
pub fn push_batch(&mut self, batch: &Batch) -> Result<()> {
self.queue.write_all(&serde_json::to_vec(batch)?)?;
self.queue.write_all(b"\n")?;
Ok(())
}
pub fn flush(mut self) -> Result<()> {
self.queue.flush()?;
Ok(())
}
}
pub struct UpdateFile { pub struct UpdateFile {
path: PathBuf, path: PathBuf,
writer: Option<BufWriter<File>>, writer: Option<BufWriter<File>>,
@ -209,8 +238,8 @@ pub(crate) mod test {
use super::*; use super::*;
use crate::reader::Document; use crate::reader::Document;
use crate::test::{ use crate::test::{
create_test_api_keys, create_test_documents, create_test_dump, create_test_instance_uid, create_test_api_keys, create_test_batches, create_test_documents, create_test_dump,
create_test_settings, create_test_tasks, create_test_instance_uid, create_test_settings, create_test_tasks,
}; };
fn create_directory_hierarchy(dir: &Path) -> String { fn create_directory_hierarchy(dir: &Path) -> String {
@ -285,8 +314,10 @@ pub(crate) mod test {
let dump_path = dump.path(); let dump_path = dump.path();
// ==== checking global file hierarchy (we want to be sure there isn't too many files or too few) // ==== checking global file hierarchy (we want to be sure there isn't too many files or too few)
insta::assert_snapshot!(create_directory_hierarchy(dump_path), @r###" insta::assert_snapshot!(create_directory_hierarchy(dump_path), @r"
. .
---- batches/
---- queue.jsonl
---- indexes/ ---- indexes/
---- doggos/ ---- doggos/
---- documents.jsonl ---- documents.jsonl
@ -301,7 +332,7 @@ pub(crate) mod test {
---- keys.jsonl ---- keys.jsonl
---- metadata.json ---- metadata.json
---- network.json ---- network.json
"###); ");
// ==== checking the top level infos // ==== checking the top level infos
let metadata = fs::read_to_string(dump_path.join("metadata.json")).unwrap(); let metadata = fs::read_to_string(dump_path.join("metadata.json")).unwrap();
@ -354,6 +385,16 @@ pub(crate) mod test {
} }
} }
// ==== checking the batch queue
let batches_queue = fs::read_to_string(dump_path.join("batches/queue.jsonl")).unwrap();
for (batch, expected) in batches_queue.lines().zip(create_test_batches()) {
let mut batch = serde_json::from_str::<Batch>(batch).unwrap();
if batch.details.settings == Some(Box::new(Settings::<Unchecked>::default())) {
batch.details.settings = None;
}
assert_eq!(batch, expected, "{batch:#?}{expected:#?}");
}
// ==== checking the keys // ==== checking the keys
let keys = fs::read_to_string(dump_path.join("keys.jsonl")).unwrap(); let keys = fs::read_to_string(dump_path.join("keys.jsonl")).unwrap();
for (key, expected) in keys.lines().zip(create_test_api_keys()) { for (key, expected) in keys.lines().zip(create_test_api_keys()) {

View File

@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::io; use std::io;
use dump::{KindDump, TaskDump, UpdateFile}; use dump::{KindDump, TaskDump, UpdateFile};
use meilisearch_types::batches::{Batch, BatchId};
use meilisearch_types::heed::RwTxn; use meilisearch_types::heed::RwTxn;
use meilisearch_types::milli; use meilisearch_types::milli;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
@ -14,9 +15,15 @@ pub struct Dump<'a> {
index_scheduler: &'a IndexScheduler, index_scheduler: &'a IndexScheduler,
wtxn: RwTxn<'a>, wtxn: RwTxn<'a>,
batch_to_task_mapping: HashMap<BatchId, RoaringBitmap>,
indexes: HashMap<String, RoaringBitmap>, indexes: HashMap<String, RoaringBitmap>,
statuses: HashMap<Status, RoaringBitmap>, statuses: HashMap<Status, RoaringBitmap>,
kinds: HashMap<Kind, RoaringBitmap>, kinds: HashMap<Kind, RoaringBitmap>,
batch_indexes: HashMap<String, RoaringBitmap>,
batch_statuses: HashMap<Status, RoaringBitmap>,
batch_kinds: HashMap<Kind, RoaringBitmap>,
} }
impl<'a> Dump<'a> { impl<'a> Dump<'a> {
@ -27,12 +34,72 @@ impl<'a> Dump<'a> {
Ok(Dump { Ok(Dump {
index_scheduler, index_scheduler,
wtxn, wtxn,
batch_to_task_mapping: HashMap::new(),
indexes: HashMap::new(), indexes: HashMap::new(),
statuses: HashMap::new(), statuses: HashMap::new(),
kinds: HashMap::new(), kinds: HashMap::new(),
batch_indexes: HashMap::new(),
batch_statuses: HashMap::new(),
batch_kinds: HashMap::new(),
}) })
} }
/// Register a new batch coming from a dump in the scheduler.
/// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running.
pub fn register_dumped_batch(&mut self, batch: Batch) -> Result<()> {
self.index_scheduler.queue.batches.all_batches.put(&mut self.wtxn, &batch.uid, &batch)?;
if let Some(enqueued_at) = batch.enqueued_at {
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.queue.batches.enqueued_at,
enqueued_at.earliest,
batch.uid,
)?;
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.queue.batches.enqueued_at,
enqueued_at.oldest,
batch.uid,
)?;
}
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.queue.batches.started_at,
batch.started_at,
batch.uid,
)?;
if let Some(finished_at) = batch.finished_at {
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.queue.batches.finished_at,
finished_at,
batch.uid,
)?;
}
for index in batch.stats.index_uids.keys() {
match self.batch_indexes.get_mut(index) {
Some(bitmap) => {
bitmap.insert(batch.uid);
}
None => {
let mut bitmap = RoaringBitmap::new();
bitmap.insert(batch.uid);
self.batch_indexes.insert(index.to_string(), bitmap);
}
};
}
for status in batch.stats.status.keys() {
self.batch_statuses.entry(*status).or_default().insert(batch.uid);
}
for kind in batch.stats.types.keys() {
self.batch_kinds.entry(*kind).or_default().insert(batch.uid);
}
Ok(())
}
/// Register a new task coming from a dump in the scheduler. /// Register a new task coming from a dump in the scheduler.
/// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running. /// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running.
pub fn register_dumped_task( pub fn register_dumped_task(
@ -149,6 +216,9 @@ impl<'a> Dump<'a> {
}; };
self.index_scheduler.queue.tasks.all_tasks.put(&mut self.wtxn, &task.uid, &task)?; self.index_scheduler.queue.tasks.all_tasks.put(&mut self.wtxn, &task.uid, &task)?;
if let Some(batch_id) = task.batch_uid {
self.batch_to_task_mapping.entry(batch_id).or_default().insert(task.uid);
}
for index in task.indexes() { for index in task.indexes() {
match self.indexes.get_mut(index) { match self.indexes.get_mut(index) {
@ -198,6 +268,14 @@ impl<'a> Dump<'a> {
/// Commit all the changes and exit the importing dump state /// Commit all the changes and exit the importing dump state
pub fn finish(mut self) -> Result<()> { pub fn finish(mut self) -> Result<()> {
for (batch_id, task_ids) in self.batch_to_task_mapping {
self.index_scheduler.queue.batch_to_tasks_mapping.put(
&mut self.wtxn,
&batch_id,
&task_ids,
)?;
}
for (index, bitmap) in self.indexes { for (index, bitmap) in self.indexes {
self.index_scheduler.queue.tasks.index_tasks.put(&mut self.wtxn, &index, &bitmap)?; self.index_scheduler.queue.tasks.index_tasks.put(&mut self.wtxn, &index, &bitmap)?;
} }
@ -208,6 +286,16 @@ impl<'a> Dump<'a> {
self.index_scheduler.queue.tasks.put_kind(&mut self.wtxn, kind, &bitmap)?; self.index_scheduler.queue.tasks.put_kind(&mut self.wtxn, kind, &bitmap)?;
} }
for (index, bitmap) in self.batch_indexes {
self.index_scheduler.queue.batches.index_tasks.put(&mut self.wtxn, &index, &bitmap)?;
}
for (status, bitmap) in self.batch_statuses {
self.index_scheduler.queue.batches.put_status(&mut self.wtxn, status, &bitmap)?;
}
for (kind, bitmap) in self.batch_kinds {
self.index_scheduler.queue.batches.put_kind(&mut self.wtxn, kind, &bitmap)?;
}
self.wtxn.commit()?; self.wtxn.commit()?;
self.index_scheduler.scheduler.wake_up.signal(); self.index_scheduler.scheduler.wake_up.signal();

View File

@ -96,6 +96,7 @@ make_enum_progress! {
StartTheDumpCreation, StartTheDumpCreation,
DumpTheApiKeys, DumpTheApiKeys,
DumpTheTasks, DumpTheTasks,
DumpTheBatches,
DumpTheIndexes, DumpTheIndexes,
DumpTheExperimentalFeatures, DumpTheExperimentalFeatures,
CompressTheDump, CompressTheDump,

View File

@ -1,3 +1,4 @@
use std::collections::BTreeMap;
use std::fs::File; use std::fs::File;
use std::io::BufWriter; use std::io::BufWriter;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -11,7 +12,9 @@ use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
use time::macros::format_description; use time::macros::format_description;
use time::OffsetDateTime; use time::OffsetDateTime;
use crate::processing::{AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress}; use crate::processing::{
AtomicBatchStep, AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress,
};
use crate::{Error, IndexScheduler, Result}; use crate::{Error, IndexScheduler, Result};
impl IndexScheduler { impl IndexScheduler {
@ -102,7 +105,40 @@ impl IndexScheduler {
} }
dump_tasks.flush()?; dump_tasks.flush()?;
// 3. Dump the indexes // 3. dump the batches
progress.update_progress(DumpCreationProgress::DumpTheBatches);
let mut dump_batches = dump.create_batches_queue()?;
let (atomic, update_batch_progress) =
AtomicBatchStep::new(self.queue.batches.all_batches.len(&rtxn)? as u32);
progress.update_progress(update_batch_progress);
for ret in self.queue.batches.all_batches.iter(&rtxn)? {
if self.scheduler.must_stop_processing.get() {
return Err(Error::AbortedTask);
}
let (_, mut b) = ret?;
// In the case we're dumping ourselves we want to be marked as finished
// to not loop over ourselves indefinitely.
if b.uid == task.uid {
let finished_at = OffsetDateTime::now_utc();
// We're going to fake the date because we don't know if everything is going to go well.
// But we need to dump the task as finished and successful.
// If something fail everything will be set appropriately in the end.
let mut statuses = BTreeMap::new();
statuses.insert(Status::Succeeded, b.stats.total_nb_tasks);
b.stats.status = statuses;
b.finished_at = Some(finished_at);
}
dump_batches.push_batch(&b)?;
atomic.fetch_add(1, Ordering::Relaxed);
}
dump_batches.flush()?;
// 4. Dump the indexes
progress.update_progress(DumpCreationProgress::DumpTheIndexes); progress.update_progress(DumpCreationProgress::DumpTheIndexes);
let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32; let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32;
let mut count = 0; let mut count = 0;
@ -142,7 +178,7 @@ impl IndexScheduler {
let documents = index let documents = index
.all_documents(&rtxn) .all_documents(&rtxn)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
// 3.1. Dump the documents // 4.1. Dump the documents
for ret in documents { for ret in documents {
if self.scheduler.must_stop_processing.get() { if self.scheduler.must_stop_processing.get() {
return Err(Error::AbortedTask); return Err(Error::AbortedTask);
@ -204,7 +240,7 @@ impl IndexScheduler {
atomic.fetch_add(1, Ordering::Relaxed); atomic.fetch_add(1, Ordering::Relaxed);
} }
// 3.2. Dump the settings // 4.2. Dump the settings
let settings = meilisearch_types::settings::settings( let settings = meilisearch_types::settings::settings(
index, index,
&rtxn, &rtxn,
@ -215,7 +251,7 @@ impl IndexScheduler {
Ok(()) Ok(())
})?; })?;
// 4. Dump experimental feature settings // 5. Dump experimental feature settings
progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures); progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures);
let features = self.features().runtime_features(); let features = self.features().runtime_features();
dump.create_experimental_features(features)?; dump.create_experimental_features(features)?;

View File

@ -30,7 +30,21 @@ pub struct Batch {
pub enqueued_at: Option<BatchEnqueuedAt>, pub enqueued_at: Option<BatchEnqueuedAt>,
} }
#[derive(Clone, Copy, Debug, Serialize, Deserialize)] impl PartialEq for Batch {
fn eq(&self, other: &Self) -> bool {
let Self { uid, progress, details, stats, started_at, finished_at, enqueued_at } = self;
*uid == other.uid
&& progress.is_none() == other.progress.is_none()
&& details == &other.details
&& stats == &other.stats
&& started_at == &other.started_at
&& finished_at == &other.finished_at
&& enqueued_at == &other.enqueued_at
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct BatchEnqueuedAt { pub struct BatchEnqueuedAt {
#[serde(with = "time::serde::rfc3339")] #[serde(with = "time::serde::rfc3339")]
pub earliest: OffsetDateTime, pub earliest: OffsetDateTime,
@ -38,7 +52,7 @@ pub struct BatchEnqueuedAt {
pub oldest: OffsetDateTime, pub oldest: OffsetDateTime,
} }
#[derive(Default, Debug, Clone, Serialize, Deserialize, ToSchema)] #[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")] #[schema(rename_all = "camelCase")]
pub struct BatchStats { pub struct BatchStats {

View File

@ -511,9 +511,15 @@ fn import_dump(
index_scheduler.refresh_index_stats(&uid)?; index_scheduler.refresh_index_stats(&uid)?;
} }
// 5. Import the queue
let mut index_scheduler_dump = index_scheduler.register_dumped_task()?; let mut index_scheduler_dump = index_scheduler.register_dumped_task()?;
// 5.1. Import the batches
for ret in dump_reader.batches()? {
let batch = ret?;
index_scheduler_dump.register_dumped_batch(batch)?;
}
// 5. Import the tasks. // 5.2. Import the tasks
for ret in dump_reader.tasks()? { for ret in dump_reader.tasks()? {
let (task, file) = ret?; let (task, file) = ret?;
index_scheduler_dump.register_dumped_task(task, file)?; index_scheduler_dump.register_dumped_task(task, file)?;

View File

@ -8,6 +8,7 @@ use clap::{Parser, Subcommand};
use dump::{DumpWriter, IndexMetadata}; use dump::{DumpWriter, IndexMetadata};
use file_store::FileStore; use file_store::FileStore;
use meilisearch_auth::AuthController; use meilisearch_auth::AuthController;
use meilisearch_types::batches::Batch;
use meilisearch_types::heed::types::{SerdeJson, Str}; use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{ use meilisearch_types::heed::{
CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified, CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified,
@ -255,70 +256,86 @@ fn export_a_dump(
eprintln!("Successfully dumped {count} keys!"); eprintln!("Successfully dumped {count} keys!");
eprintln!("Dumping the queue");
let rtxn = env.read_txn()?; let rtxn = env.read_txn()?;
let all_tasks: Database<BEU32, SerdeJson<Task>> = let all_tasks: Database<BEU32, SerdeJson<Task>> =
try_opening_database(&env, &rtxn, "all-tasks")?; try_opening_database(&env, &rtxn, "all-tasks")?;
let all_batches: Database<BEU32, SerdeJson<Batch>> =
try_opening_database(&env, &rtxn, "all-batches")?;
let index_mapping: Database<Str, UuidCodec> = let index_mapping: Database<Str, UuidCodec> =
try_opening_database(&env, &rtxn, "index-mapping")?; try_opening_database(&env, &rtxn, "index-mapping")?;
if skip_enqueued_tasks { eprintln!("Dumping the tasks");
eprintln!("Skip dumping the enqueued tasks..."); let mut dump_tasks = dump.create_tasks_queue()?;
} else { let mut count_tasks = 0;
let mut dump_tasks = dump.create_tasks_queue()?; let mut count_enqueued_tasks = 0;
let mut count = 0; for ret in all_tasks.iter(&rtxn)? {
for ret in all_tasks.iter(&rtxn)? { let (_, t) = ret?;
let (_, t) = ret?; let status = t.status;
let status = t.status; let content_file = t.content_uuid();
let content_file = t.content_uuid();
let mut dump_content_file = dump_tasks.push_task(&t.into())?; if status == Status::Enqueued && skip_enqueued_tasks {
continue;
}
// 3.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. let mut dump_content_file = dump_tasks.push_task(&t.into())?;
if let Some(content_file_uuid) = content_file {
if status == Status::Enqueued {
let content_file = file_store.get_update(content_file_uuid)?;
if (detected_version.0, detected_version.1, detected_version.2) < (1, 12, 0) { // 3.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
eprintln!("Dumping the enqueued tasks reading them in obkv format..."); if let Some(content_file_uuid) = content_file {
let reader = if status == Status::Enqueued {
DocumentsBatchReader::from_reader(content_file).with_context(|| { let content_file = file_store.get_update(content_file_uuid)?;
format!("While reading content file {:?}", content_file_uuid)
})?; if (detected_version.0, detected_version.1, detected_version.2) < (1, 12, 0) {
let (mut cursor, documents_batch_index) = eprintln!("Dumping the enqueued tasks reading them in obkv format...");
reader.into_cursor_and_fields_index(); let reader =
while let Some(doc) = cursor.next_document().with_context(|| { DocumentsBatchReader::from_reader(content_file).with_context(|| {
format!("While iterating on content file {:?}", content_file_uuid) format!("While reading content file {:?}", content_file_uuid)
})? { })?;
dump_content_file let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
.push_document(&obkv_to_object(doc, &documents_batch_index)?)?; while let Some(doc) = cursor.next_document().with_context(|| {
} format!("While iterating on content file {:?}", content_file_uuid)
} else { })? {
eprintln!( dump_content_file
"Dumping the enqueued tasks reading them in JSON stream format..." .push_document(&obkv_to_object(doc, &documents_batch_index)?)?;
); }
for document in } else {
serde_json::de::Deserializer::from_reader(content_file).into_iter() eprintln!("Dumping the enqueued tasks reading them in JSON stream format...");
{ for document in
let document = document.with_context(|| { serde_json::de::Deserializer::from_reader(content_file).into_iter()
format!("While reading content file {:?}", content_file_uuid) {
})?; let document = document.with_context(|| {
dump_content_file.push_document(&document)?; format!("While reading content file {:?}", content_file_uuid)
} })?;
dump_content_file.push_document(&document)?;
} }
dump_content_file.flush()?;
count += 1;
} }
dump_content_file.flush()?;
count_enqueued_tasks += 1;
} }
} }
dump_tasks.flush()?; count_tasks += 1;
eprintln!("Successfully dumped {count} enqueued tasks!");
} }
dump_tasks.flush()?;
eprintln!(
"Successfully dumped {count_tasks} tasks including {count_enqueued_tasks} enqueued tasks!"
);
// 4. dump the batches
eprintln!("Dumping the batches");
let mut dump_batches = dump.create_batches_queue()?;
let mut count = 0;
for ret in all_batches.iter(&rtxn)? {
let (_, b) = ret?;
dump_batches.push_batch(&b)?;
count += 1;
}
dump_batches.flush()?;
eprintln!("Successfully dumped {count} batches!");
// 5. Dump the indexes
eprintln!("Dumping the indexes..."); eprintln!("Dumping the indexes...");
// 4. Dump the indexes
let mut count = 0; let mut count = 0;
for result in index_mapping.iter(&rtxn)? { for result in index_mapping.iter(&rtxn)? {
let (uid, uuid) = result?; let (uid, uuid) = result?;
@ -339,14 +356,14 @@ fn export_a_dump(
let fields_ids_map = index.fields_ids_map(&rtxn)?; let fields_ids_map = index.fields_ids_map(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
// 4.1. Dump the documents // 5.1. Dump the documents
for ret in index.all_documents(&rtxn)? { for ret in index.all_documents(&rtxn)? {
let (_id, doc) = ret?; let (_id, doc) = ret?;
let document = obkv_to_json(&all_fields, &fields_ids_map, doc)?; let document = obkv_to_json(&all_fields, &fields_ids_map, doc)?;
index_dumper.push_document(&document)?; index_dumper.push_document(&document)?;
} }
// 4.2. Dump the settings // 5.2. Dump the settings
let settings = meilisearch_types::settings::settings( let settings = meilisearch_types::settings::settings(
&index, &index,
&rtxn, &rtxn,