From 83f3c5ec57f17665b50f0438c1ba0eaeaa2a3719 Mon Sep 17 00:00:00 2001
From: Tamo <tamo@meilisearch.com>
Date: Mon, 17 Oct 2022 17:04:52 +0200
Subject: [PATCH] flush the dump-writer only once everything has been inserted

---
 dump/src/lib.rs              |  3 +++
 dump/src/writer.rs           | 25 ++++++++++++++++++++++---
 index-scheduler/src/batch.rs | 11 +++++++----
 3 files changed, 32 insertions(+), 7 deletions(-)

diff --git a/dump/src/lib.rs b/dump/src/lib.rs
index 4194e53a1..b0dfdce91 100644
--- a/dump/src/lib.rs
+++ b/dump/src/lib.rs
@@ -382,6 +382,7 @@ pub(crate) mod test {
         for document in &documents {
             index.push_document(document).unwrap();
         }
+        index.flush().unwrap();
         index.settings(&settings).unwrap();
 
         // ========== pushing the task queue
@@ -396,6 +397,7 @@ pub(crate) mod test {
                 }
             }
         }
+        task_queue.flush().unwrap();
 
         // ========== pushing the api keys
         let api_keys = create_test_api_keys();
@@ -404,6 +406,7 @@ pub(crate) mod test {
         for key in &api_keys {
             keys.push_key(key).unwrap();
         }
+        keys.flush().unwrap();
 
         // create the dump
         let mut file = tempfile::tempfile().unwrap();
diff --git a/dump/src/writer.rs b/dump/src/writer.rs
index abb270cb8..8bedd208e 100644
--- a/dump/src/writer.rs
+++ b/dump/src/writer.rs
@@ -87,6 +87,11 @@ impl KeyWriter {
         self.keys.write_all(b"\n")?;
         Ok(())
     }
+
+    pub fn flush(mut self) -> Result<()> {
+        self.keys.flush()?;
+        Ok(())
+    }
 }
 
 pub struct TaskWriter {
@@ -113,12 +118,16 @@ impl TaskWriter {
     pub fn push_task(&mut self, task: &TaskDump) -> Result<UpdateFile> {
         self.queue.write_all(&serde_json::to_vec(task)?)?;
         self.queue.write_all(b"\n")?;
-        self.queue.flush()?;
 
         Ok(UpdateFile::new(
             self.update_files.join(format!("{}.jsonl", task.uid)),
         ))
     }
+
+    pub fn flush(mut self) -> Result<()> {
+        self.queue.flush()?;
+        Ok(())
+    }
 }
 
 pub struct UpdateFile {
@@ -135,7 +144,6 @@ impl UpdateFile {
         if let Some(writer) = self.writer.as_mut() {
             writer.write_all(&serde_json::to_vec(document)?)?;
             writer.write_all(b"\n")?;
-            writer.flush()?;
         } else {
             let file = File::create(&self.path).unwrap();
             self.writer = Some(BufWriter::new(file));
@@ -143,6 +151,13 @@ impl UpdateFile {
         }
         Ok(())
     }
+
+    pub fn flush(self) -> Result<()> {
+        if let Some(mut writer) = self.writer {
+            writer.flush()?;
+        }
+        Ok(())
+    }
 }
 
 pub struct IndexWriter {
@@ -167,8 +182,12 @@ impl IndexWriter {
     }
 
     pub fn push_document(&mut self, document: &Map<String, Value>) -> Result<()> {
-        self.documents.write_all(&serde_json::to_vec(document)?)?;
+        serde_json::to_writer(&mut self.documents, document)?;
         self.documents.write_all(b"\n")?;
+        Ok(())
+    }
+
+    pub fn flush(&mut self) -> Result<()> {
         self.documents.flush()?;
         Ok(())
     }
diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs
index f4e8bd8b7..fbbe5d878 100644
--- a/index-scheduler/src/batch.rs
+++ b/index-scheduler/src/batch.rs
@@ -499,17 +499,18 @@ impl IndexScheduler {
                     unreachable!();
                 };
                 let dump = dump::DumpWriter::new(instance_uid.clone())?;
-                let mut dump_keys = dump.create_keys()?;
 
                 // 1. dump the keys
+                let mut dump_keys = dump.create_keys()?;
                 for key in keys {
                     dump_keys.push_key(key)?;
                 }
+                dump_keys.flush()?;
 
                 let rtxn = self.env.read_txn()?;
 
                 // 2. dump the tasks
-                let mut tasks = dump.create_tasks_queue()?;
+                let mut dump_tasks = dump.create_tasks_queue()?;
                 for ret in self.all_tasks.iter(&rtxn)? {
                     let (_, mut t) = ret?;
                     let status = t.status;
@@ -527,7 +528,7 @@ impl IndexScheduler {
                         t.started_at = Some(started_at);
                         t.finished_at = Some(finished_at);
                     }
-                    let mut dump_content_file = tasks.push_task(&t.into())?;
+                    let mut dump_content_file = dump_tasks.push_task(&t.into())?;
 
                     // 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
                     if let Some(content_file) = content_file {
@@ -548,9 +549,11 @@ impl IndexScheduler {
                                     &documents_batch_index,
                                 )?)?;
                             }
+                            dump_content_file.flush()?;
                         }
                     }
                 }
+                dump_tasks.flush()?;
 
                 // TODO: maybe `self.indexes` could use this rtxn instead of creating its own
                 drop(rtxn);
@@ -585,8 +588,8 @@ impl IndexScheduler {
                 let file = File::create(path)?;
                 dump.persist_to(BufWriter::new(file))?;
 
+                // if we reached this step we can tell the scheduler we succeeded to dump ourselves.
                 task.status = Status::Succeeded;
-
                 Ok(vec![task])
             }
             Batch::IndexOperation(operation) => {