load index dump

This commit is contained in:
Marin Postma 2021-05-26 22:52:06 +02:00
parent e818c33fec
commit b924e897f1
No known key found for this signature in database
GPG key ID: D5241F0C0C865F30
11 changed files with 261 additions and 279 deletions

View file

@ -1,25 +1,27 @@
use std::{fs::File, io::BufReader, marker::PhantomData, path::Path};
use std::path::Path;
use anyhow::Context;
use chrono::{DateTime, Utc};
use log::info;
use serde::{Deserialize, Serialize};
use crate::index_controller::uuid_resolver::store::UuidStore;
use crate::{index::Index, index_controller::{update_actor::UpdateStore, uuid_resolver::HeedUuidStore}, option::IndexerOpts};
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataV2<U> {
pub struct MetadataV2 {
db_version: String,
index_db_size: usize,
update_db_size: usize,
index_db_size: u64,
update_db_size: u64,
dump_date: DateTime<Utc>,
_pth: PhantomData<U>,
}
impl<U> MetadataV2<U>
where U: UuidStore,
{
pub fn load_dump(self, src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
impl MetadataV2 {
pub fn load_dump(
self,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
indexing_options: &IndexerOpts,
) -> anyhow::Result<()> {
info!(
"Loading dump from {}, dump database version: {}, dump version: V2",
self.dump_date, self.db_version
@ -32,148 +34,26 @@ where U: UuidStore,
let tmp_dst = tempfile::tempdir_in(dst_dir)?;
self.load_index_resolver(&src, tmp_dst.path())?;
load_updates(&src, tmp_dst.path())?;
load_indexes(&src, tmp_dst.path())?;
Ok(())
}
fn load_index_resolver(
&self,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
) -> anyhow::Result<()> {
info!("Loading index database.");
let uuid_resolver_path = dst.as_ref().join("uuid_resolver/");
std::fs::create_dir_all(&uuid_resolver_path)?;
HeedUuidStore::load_dump(src.as_ref(), tmp_dst.as_ref())?;
U::load_dump(src.as_ref(), dst.as_ref())?;
info!("Loading updates.");
UpdateStore::load_dump(&src, &tmp_dst.as_ref(), self.update_db_size)?;
info!("Loading indexes");
let indexes_path = src.as_ref().join("indexes");
let indexes = indexes_path.read_dir()?;
for index in indexes {
let index = index?;
Index::load_dump(&index.path(), &dst, self.index_db_size, indexing_options)?;
}
// Persist and atomically rename the db
let persisted_dump = tmp_dst.into_path();
std::fs::rename(&persisted_dump, &dst)?;
Ok(())
}
}
fn load_updates(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
info!("Loading updates.");
todo!()
}
fn load_indexes(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
info!("Loading indexes");
todo!()
}
// Extract Settings from `settings.json` file present at provided `dir_path`
//fn import_settings(dir_path: &Path) -> anyhow::Result<Settings<Checked>> {
//let path = dir_path.join("settings.json");
//let file = File::open(path)?;
//let reader = BufReader::new(file);
//let metadata: Settings<Unchecked> = serde_json::from_reader(reader)?;
//Ok(metadata.check())
//}
//pub fn import_dump(
//_db_size: usize,
//update_db_size: usize,
//_uuid: Uuid,
//dump_path: impl AsRef<Path>,
//db_path: impl AsRef<Path>,
//_primary_key: Option<&str>,
//) -> anyhow::Result<()> {
//info!("Dump import started.");
//info!("Importing outstanding updates...");
//import_updates(&dump_path, &db_path, update_db_size)?;
//info!("done importing updates");
//Ok(())
////let index_path = db_path.join(&format!("indexes/index-{}", uuid));
////std::fs::create_dir_all(&index_path)?;
////let mut options = EnvOpenOptions::new();
////options.map_size(size);
////let index = milli::Index::new(options, index_path)?;
////let index = Index(Arc::new(index));
////let mut txn = index.write_txn()?;
////info!("importing the settings...");
////// extract `settings.json` file and import content
////let settings = import_settings(&dump_path)?;
////let update_builder = UpdateBuilder::new(0);
////index.update_settings_txn(&mut txn, &settings, update_builder)?;
////// import the documents in the index
////let update_builder = UpdateBuilder::new(1);
////let file = File::open(&dump_path.join("documents.jsonl"))?;
////let reader = std::io::BufReader::new(file);
////info!("importing the documents...");
////// TODO: TAMO: currently we ignore any error caused by the importation of the documents because
////// if there is no documents nor primary key it'll throw an anyhow error, but we must remove
////// this before the merge on main
////index.update_documents_txn(
////&mut txn,
////UpdateFormat::JsonStream,
////IndexDocumentsMethod::ReplaceDocuments,
////Some(reader),
////update_builder,
////primary_key,
////)?;
////txn.commit()?;
////// the last step: we extract the original milli::Index and close it
////Arc::try_unwrap(index.0)
////.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index")
////.unwrap()
////.prepare_for_closing()
////.wait();
////info!("importing the updates...");
////import_updates(dump_path, db_path)
//}
//fn import_updates(
//src_path: impl AsRef<Path>,
//dst_path: impl AsRef<Path>,
//_update_db_size: usize
//) -> anyhow::Result<()> {
//let dst_update_path = dst_path.as_ref().join("updates");
//std::fs::create_dir_all(&dst_update_path)?;
//let dst_update_files_path = dst_update_path.join("update_files");
//std::fs::create_dir_all(&dst_update_files_path)?;
//let options = EnvOpenOptions::new();
//let (update_store, _) = UpdateStore::create(options, &dst_update_path)?;
//let src_update_path = src_path.as_ref().join("updates");
//let src_update_files_path = src_update_path.join("update_files");
//let update_data = File::open(&src_update_path.join("data.jsonl"))?;
//let mut update_data = BufReader::new(update_data);
//let mut wtxn = update_store.env.write_txn()?;
//let mut line = String::new();
//loop {
//match update_data.read_line(&mut line) {
//Ok(_) => {
//let UpdateEntry { uuid, mut update } = serde_json::from_str(&line)?;
//if let Some(path) = update.content_path_mut() {
//let dst_file_path = dst_update_files_path.join(&path);
//let src_file_path = src_update_files_path.join(&path);
//*path = dst_update_files_path.join(&path);
//std::fs::copy(src_file_path, dst_file_path)?;
//}
//update_store.register_raw_updates(&mut wtxn, update, uuid)?;
//}
//_ => break,
//}
//}
//wtxn.commit()?;
//Ok(())
//}

View file

@ -1,8 +1,3 @@
mod actor;
mod handle_impl;
mod message;
mod loaders;
use std::{fs::File, path::Path};
use log::error;
@ -18,6 +13,15 @@ pub use actor::DumpActor;
pub use handle_impl::*;
pub use message::DumpMsg;
use crate::option::IndexerOpts;
use super::uuid_resolver::store::UuidStore;
mod actor;
mod handle_impl;
mod loaders;
mod message;
pub type DumpResult<T> = std::result::Result<T, DumpError>;
#[derive(Error, Debug)]
@ -117,11 +121,12 @@ impl DumpInfo {
}
}
pub fn load_dump(
pub fn load_dump<U: UuidStore>(
dst_path: impl AsRef<Path>,
src_path: impl AsRef<Path>,
_index_db_size: u64,
_update_db_size: u64,
indexer_opts: &IndexerOpts,
) -> anyhow::Result<()> {
let meta_path = src_path.as_ref().join("metadat.json");
let mut meta_file = File::open(&meta_path)?;
@ -129,7 +134,7 @@ pub fn load_dump(
match meta {
Metadata::V1 { meta } => meta.load_dump(src_path, dst_path)?,
Metadata::V2 { meta } => meta.load_dump(src_path, dst_path)?,
Metadata::V2 { meta } => meta.load_dump(src_path.as_ref(), dst_path.as_ref(), indexer_opts)?,
}
Ok(())

View file

@ -10,9 +10,9 @@ use tokio::{fs, sync::mpsc};
use tokio::task::spawn_blocking;
use uuid::Uuid;
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings, update_handler::UpdateHandler};
use crate::index_controller::{
get_arc_ownership_blocking, update_handler::UpdateHandler, Failed, IndexStats, Processed,
get_arc_ownership_blocking, Failed, IndexStats, Processed,
Processing,
};
use crate::option::IndexerOpts;

View file

@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use heed::EnvOpenOptions;
use tokio::fs;
use tokio::sync::RwLock;
use tokio::task::spawn_blocking;
@ -48,7 +47,7 @@ impl IndexStore for MapIndexStore {
let index_size = self.index_size;
let index = spawn_blocking(move || -> IndexResult<Index> {
let index = open_index(&path, index_size)?;
let index = Index::open(path, index_size)?;
if let Some(primary_key) = primary_key {
let mut txn = index.write_txn()?;
index.put_primary_key(&mut txn, &primary_key)?;
@ -76,8 +75,7 @@ impl IndexStore for MapIndexStore {
}
let index_size = self.index_size;
let index = spawn_blocking(move || open_index(path, index_size))
.await??;
let index = spawn_blocking(move || Index::open(path, index_size)).await??;
self.index_store.write().await.insert(uuid, index.clone());
Ok(Some(index))
}
@ -91,11 +89,3 @@ impl IndexStore for MapIndexStore {
Ok(index)
}
}
fn open_index(path: impl AsRef<Path>, size: usize) -> IndexResult<Index> {
std::fs::create_dir_all(&path)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, &path)?;
Ok(Index(Arc::new(index)))
}

View file

@ -29,7 +29,6 @@ mod dump_actor;
mod index_actor;
mod snapshot;
mod update_actor;
mod update_handler;
mod updates;
mod uuid_resolver;

View file

@ -1,12 +1,7 @@
use std::{
collections::HashSet,
fs::{copy, create_dir_all, File},
io::Write,
path::{Path, PathBuf},
};
use std::{collections::HashSet, fs::{copy, create_dir_all, File}, io::{BufRead, BufReader, Write}, path::{Path, PathBuf}};
use anyhow::Context;
use heed::RoTxn;
use heed::{EnvOpenOptions, RoTxn};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
@ -15,7 +10,7 @@ use super::UpdateStore;
use crate::index_controller::{index_actor::IndexActorHandle, UpdateStatus};
#[derive(Serialize, Deserialize)]
pub struct UpdateEntry {
struct UpdateEntry {
uuid: Uuid,
update: UpdateStatus,
}
@ -121,6 +116,48 @@ impl UpdateStore {
Ok(())
}
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>, db_size: u64) -> anyhow::Result<()> {
let dst_updates_path = dst.as_ref().join("updates/");
create_dir_all(&dst_updates_path)?;
let dst_update_files_path = dst_updates_path.join("update_files/");
create_dir_all(&dst_update_files_path)?;
let mut options = EnvOpenOptions::new();
options.map_size(db_size as usize);
let (store, _) = UpdateStore::new(options, &dst_updates_path)?;
let src_update_path = src.as_ref().join("updates");
let src_update_files_path = src_update_path.join("update_files");
let update_data = File::open(&src_update_path.join("data.jsonl"))?;
let mut update_data = BufReader::new(update_data);
let mut wtxn = store.env.write_txn()?;
let mut line = String::new();
loop {
match update_data.read_line(&mut line) {
Ok(0) => break,
Ok(_) => {
let UpdateEntry { uuid, mut update } = serde_json::from_str(&line)?;
if let Some(path) = update.content_path_mut() {
let dst_file_path = dst_update_files_path.join(&path);
let src_file_path = src_update_files_path.join(&path);
*path = dst_update_files_path.join(&path);
std::fs::copy(src_file_path, dst_file_path)?;
}
store.register_raw_updates(&mut wtxn, update, uuid)?;
}
_ => break,
}
line.clear();
}
wtxn.commit()?;
Ok(())
}
}
async fn dump_indexes(uuids: &HashSet<Uuid>, handle: impl IndexActorHandle, path: impl AsRef<Path>)-> anyhow::Result<()> {

View file

@ -100,7 +100,7 @@ pub struct UpdateStore {
}
impl UpdateStore {
pub fn create(
fn new(
mut options: EnvOpenOptions,
path: impl AsRef<Path>,
) -> anyhow::Result<(Self, mpsc::Receiver<()>)> {
@ -114,7 +114,6 @@ impl UpdateStore {
let state = Arc::new(StateLock::from_state(State::Idle));
let (notification_sender, notification_receiver) = mpsc::channel(10);
// Send a first notification to trigger the process.
Ok((
Self {
@ -134,10 +133,10 @@ impl UpdateStore {
path: impl AsRef<Path>,
index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static,
) -> anyhow::Result<Arc<Self>> {
let (update_store, mut notification_receiver) = Self::create(options, path)?;
let (update_store, mut notification_receiver) = Self::new(options, path)?;
let update_store = Arc::new(update_store);
// trigger the update loop
// Send a first notification to trigger the process.
let _ = update_store.notification_sender.send(());
// Init update loop to perform any pending updates at launch.

View file

@ -1,93 +0,0 @@
use std::fs::File;
use crate::index::Index;
use anyhow::Result;
use grenad::CompressionType;
use milli::update::UpdateBuilder;
use rayon::ThreadPool;
use crate::index_controller::UpdateMeta;
use crate::index_controller::{Failed, Processed, Processing};
use crate::option::IndexerOpts;
pub struct UpdateHandler {
max_nb_chunks: Option<usize>,
chunk_compression_level: Option<u32>,
thread_pool: ThreadPool,
log_frequency: usize,
max_memory: usize,
linked_hash_map_size: usize,
chunk_compression_type: CompressionType,
chunk_fusing_shrink_size: u64,
}
impl UpdateHandler {
pub fn new(opt: &IndexerOpts) -> anyhow::Result<Self> {
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(opt.indexing_jobs.unwrap_or(0))
.build()?;
Ok(Self {
max_nb_chunks: opt.max_nb_chunks,
chunk_compression_level: opt.chunk_compression_level,
thread_pool,
log_frequency: opt.log_every_n,
max_memory: opt.max_memory.get_bytes() as usize,
linked_hash_map_size: opt.linked_hash_map_size,
chunk_compression_type: opt.chunk_compression_type,
chunk_fusing_shrink_size: opt.chunk_fusing_shrink_size.get_bytes(),
})
}
fn update_builder(&self, update_id: u64) -> UpdateBuilder {
// We prepare the update by using the update builder.
let mut update_builder = UpdateBuilder::new(update_id);
if let Some(max_nb_chunks) = self.max_nb_chunks {
update_builder.max_nb_chunks(max_nb_chunks);
}
if let Some(chunk_compression_level) = self.chunk_compression_level {
update_builder.chunk_compression_level(chunk_compression_level);
}
update_builder.thread_pool(&self.thread_pool);
update_builder.log_every_n(self.log_frequency);
update_builder.max_memory(self.max_memory);
update_builder.linked_hash_map_size(self.linked_hash_map_size);
update_builder.chunk_compression_type(self.chunk_compression_type);
update_builder.chunk_fusing_shrink_size(self.chunk_fusing_shrink_size);
update_builder
}
pub fn handle_update(
&self,
meta: Processing,
content: Option<File>,
index: Index,
) -> Result<Processed, Failed> {
use UpdateMeta::*;
let update_id = meta.id();
let update_builder = self.update_builder(update_id);
let result = match meta.meta() {
DocumentsAddition {
method,
format,
primary_key,
} => index.update_documents(
*format,
*method,
content,
update_builder,
primary_key.as_deref(),
),
ClearDocuments => index.clear_documents(update_builder),
DeleteDocuments => index.delete_documents(content, update_builder),
Settings(settings) => index.update_settings(settings, update_builder),
};
match result {
Ok(result) => Ok(meta.process(result)),
Err(e) => Err(meta.fail(e.to_string())),
}
}
}

View file

@ -30,7 +30,6 @@ pub trait UuidStore: Sized {
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
async fn get_size(&self) -> Result<u64>;
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
fn load_dump(src: &Path, dst: &Path) -> Result<()>;
}
#[derive(Clone)]
@ -46,14 +45,7 @@ impl HeedUuidStore {
let mut options = EnvOpenOptions::new();
options.map_size(UUID_STORE_SIZE); // 1GB
let env = options.open(path)?;
let db = env.create_database(None)?;
Ok(Self { env, db })
}
pub fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> {
let env = self.env.clone();
let db = self.db;
let mut txn = env.write_txn()?;
let db = env.create_database(None)?; Ok(Self { env, db }) } pub fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> { let env = self.env.clone(); let db = self.db; let mut txn = env.write_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
if err {
@ -154,17 +146,51 @@ impl HeedUuidStore {
let txn = self.env.read_txn()?;
for entry in self.db.iter(&txn)? {
let (uid, uuid) = entry?;
let uuid = Uuid::from_slice(entry.1)?;
uuids.insert(uuid);
let uid = uid.to_string();
let uuid = Uuid::from_slice(uuid)?;
let entry = DumpEntry {
uuid, uid
};
serde_json::to_writer(&mut dump_file, &entry)?;
dump_file.write(b"\n").unwrap();
uuids.insert(uuid);
}
Ok(uuids)
}
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
let uuid_resolver_path = dst.as_ref().join("uuid_resolver/");
std::fs::create_dir_all(&uuid_resolver_path)?;
let src_indexes = src.as_ref().join("index_uuids/data.jsonl");
let indexes = File::open(&src_indexes)?;
let mut indexes = BufReader::new(indexes);
let mut line = String::new();
let db = Self::new(dst)?;
let mut txn = db.env.write_txn()?;
loop {
match indexes.read_line(&mut line) {
Ok(0) => break,
Ok(_) => {
let DumpEntry { uuid, uid } = serde_json::from_str(&line)?;
db.db.put(&mut txn, &uid, uuid.as_bytes())?;
}
Err(e) => Err(e)?,
}
line.clear();
}
txn.commit()?;
db.env.prepare_for_closing().wait();
Ok(())
}
}
#[async_trait::async_trait]
@ -207,33 +233,4 @@ impl UuidStore for HeedUuidStore {
let this = self.clone();
tokio::task::spawn_blocking(move || this.dump(path)).await?
}
async fn load_dump(src: &Path, dst: &Path) -> Result<()> {
let uuid_resolver_path = dst.join("uuid_resolver/");
std::fs::create_dir_all(&uuid_resolver_path)?;
let src_indexes = src.join("index_uuids/data.jsonl");
let indexes = File::Open(&src_indexes)?;
let mut indexes = BufReader::new(indexes);
let mut line = String::new();
let db = Self::new(dst)?;
let mut txn = db.env.write_txn()?;
loop {
match indexes.read_line(&mut line) {
Ok(0) => break,
Ok(_) => {
let DumpEntry { uuid, uid } = serde_json::from_str(&line)?;
db.db.put(&mut txn, &uid, uuid.as_bytes())?;
}
Err(e) => Err(e)?,
}
line.clear();
}
txn.commit()?;
Ok(())
}
}