implement index dump

This commit is contained in:
Marin Postma 2021-05-24 18:16:35 +02:00
parent 7ad553670f
commit 4acbe8e473
No known key found for this signature in database
GPG Key ID: D5241F0C0C865F30
8 changed files with 79 additions and 55 deletions

View File

@ -1,8 +1,11 @@
use std::{collections::{BTreeSet, HashSet}, marker::PhantomData};
use std::{collections::{BTreeSet, HashSet}, io::Write, marker::PhantomData, path::{Path, PathBuf}};
use std::ops::Deref;
use std::sync::Arc;
use std::fs::File;
use anyhow::{bail, Context};
use heed::RoTxn;
use indexmap::IndexMap;
use milli::obkv_to_json;
use serde_json::{Map, Value};
@ -38,7 +41,10 @@ where
impl Index {
pub fn settings(&self) -> anyhow::Result<Settings<Checked>> {
let txn = self.read_txn()?;
self.settings_txn(&txn)
}
pub fn settings_txn(&self, txn: &RoTxn) -> anyhow::Result<Settings<Checked>> {
let displayed_attributes = self
.displayed_fields(&txn)?
.map(|fields| fields.into_iter().map(String::from).collect());
@ -161,4 +167,57 @@ impl Index {
displayed_fields_ids.retain(|fid| attributes_to_retrieve_ids.contains(fid));
Ok(displayed_fields_ids)
}
pub fn dump(&self, path: PathBuf) -> anyhow::Result<()> {
// acquire write txn make sure any ongoing write is finnished before we start.
let txn = self.env.write_txn()?;
self.dump_documents(&txn, &path)?;
self.dump_meta(&txn, &path)?;
Ok(())
}
fn dump_documents(&self, txn: &RoTxn, path: impl AsRef<Path>) -> anyhow::Result<()> {
println!("dumping documents");
let document_file_path = path.as_ref().join("documents.jsonl");
let mut document_file = File::create(&document_file_path)?;
let documents = self.all_documents(txn)?;
let fields_ids_map = self.fields_ids_map(txn)?;
// dump documents
let mut json_map = IndexMap::new();
for document in documents {
let (_, reader) = document?;
for (fid, bytes) in reader.iter() {
if let Some(name) = fields_ids_map.name(fid) {
json_map.insert(name, serde_json::from_slice::<serde_json::Value>(bytes)?);
}
}
serde_json::to_writer(&mut document_file, &json_map)?;
document_file.write(b"\n")?;
json_map.clear();
}
Ok(())
}
fn dump_meta(&self, txn: &RoTxn, path: impl AsRef<Path>) -> anyhow::Result<()> {
println!("dumping settings");
let meta_file_path = path.as_ref().join("meta.json");
let mut meta_file = File::create(&meta_file_path)?;
let settings = self.settings_txn(txn)?;
let json = serde_json::json!({
"settings": settings,
});
serde_json::to_writer(&mut meta_file, &json)?;
Ok(())
}
}

View File

@ -1,4 +1,4 @@
use std::path::{Path};
use std::path::Path;
use actix_web::web::Bytes;
use tokio::sync::{mpsc, oneshot};
use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg, DumpResult};

View File

@ -6,7 +6,7 @@ use async_stream::stream;
use futures::stream::StreamExt;
use heed::CompactionOption;
use log::debug;
use tokio::sync::mpsc;
use tokio::{fs, sync::mpsc};
use tokio::task::spawn_blocking;
use uuid::Uuid;
@ -126,13 +126,8 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Snapshot { uuid, path, ret } => {
let _ = ret.send(self.handle_snapshot(uuid, path).await);
}
Dump {
uid,
uuid,
path,
ret,
} => {
let _ = ret.send(self.handle_dump(&uid, uuid, path).await);
Dump { uuid, path, ret } => {
let _ = ret.send(self.handle_dump(uuid, path).await);
}
GetStats { uuid, ret } => {
let _ = ret.send(self.handle_get_stats(uuid).await);
@ -312,46 +307,17 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
/// Create a `documents.jsonl` and a `settings.json` in `path/uid/` with a dump of all the
/// documents and all the settings.
async fn handle_dump(&self, uid: &str, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
use std::io::prelude::*;
use tokio::fs::create_dir_all;
async fn handle_dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
create_dir_all(&path).await?;
let path = path.join(format!("indexes/index-{}/", uuid));
fs::create_dir_all(&path).await?;
if let Some(index) = self.store.get(uuid).await? {
let documents_path = path.join(uid).join("documents.jsonl");
let settings_path = path.join(uid).join("settings.json");
spawn_blocking(move || -> anyhow::Result<()> {
// first we dump all the documents
let file = File::create(documents_path)?;
let mut file = std::io::BufWriter::new(file);
// Get write txn to wait for ongoing write transaction before dump.
let txn = index.write_txn()?;
let fields_ids_map = index.fields_ids_map(&txn)?;
// we want to save **all** the fields in the dump.
let fields_to_dump: Vec<u8> = fields_ids_map.iter().map(|(id, _)| id).collect();
for document in index.all_documents(&txn)? {
let (_doc_id, document) = document?;
let json = milli::obkv_to_json(&fields_to_dump, &fields_ids_map, document)?;
file.write_all(serde_json::to_string(&json)?.as_bytes())?;
file.write_all(b"\n")?;
}
// then we dump all the settings
let file = File::create(settings_path)?;
let mut file = std::io::BufWriter::new(file);
let settings = index.settings()?;
file.write_all(serde_json::to_string(&settings)?.as_bytes())?;
file.write_all(b"\n")?;
Ok(())
})
.await??;
}
tokio::task::spawn_blocking(move || index.dump(path)).await??;
Ok(())
}

View File

@ -136,9 +136,9 @@ impl IndexActorHandle for IndexActorHandleImpl {
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn dump(&self, uid: String, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Dump { uid, uuid, path, ret };
let msg = IndexMsg::Dump { uuid, path, ret };
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}

View File

@ -61,7 +61,6 @@ pub enum IndexMsg {
ret: oneshot::Sender<IndexResult<()>>,
},
Dump {
uid: String,
uuid: Uuid,
path: PathBuf,
ret: oneshot::Sender<IndexResult<()>>,

View File

@ -109,7 +109,7 @@ pub trait IndexActorHandle {
index_settings: IndexSettings,
) -> IndexResult<IndexMeta>;
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>;
async fn dump(&self, uid: String, uuid: Uuid, path: PathBuf) -> IndexResult<()>;
async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>;
async fn get_index_stats(&self, uuid: Uuid) -> IndexResult<IndexStats>;
}

View File

@ -642,7 +642,7 @@ impl UpdateStore {
let path = &path;
let mut stream = futures::stream::iter(uuids.iter())
.map(|(uid, uuid)| handle.dump(uid.clone(), *uuid, path.clone()))
.map(|(uid, uuid)| handle.dump(*uuid, path.clone()))
.buffer_unordered(CONCURRENT_INDEX_MSG / 3);
Handle::current().block_on(async {

View File

@ -152,8 +152,8 @@ impl HeedUuidStore {
let entry = entry?;
let uuid = Uuid::from_slice(entry.1)?;
uuids.insert(uuid);
serde_json::to_writer(&mut dump_file, &serde_json::json!({ "uid": entry.0, "uuid": uuid }))?;
dump_file.write(b"\n").unwrap();
serde_json::to_writer(&mut dump_file, &serde_json::json!({ "uid": entry.0, "uuid": uuid
}))?; dump_file.write(b"\n").unwrap();
}
Ok(uuids)