mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-26 14:54:27 +01:00
review edits
This commit is contained in:
parent
1c4f0b2ccf
commit
6609f9e3be
@ -1,12 +1,9 @@
|
|||||||
use std::{
|
use std::fs::{create_dir_all, File};
|
||||||
fs::{create_dir_all, File},
|
use std::io::{BufRead, BufReader, Write};
|
||||||
io::{BufRead, BufReader},
|
use std::path::Path;
|
||||||
path::Path,
|
use std::sync::Arc;
|
||||||
sync::Arc,
|
|
||||||
};
|
|
||||||
|
|
||||||
use anyhow::bail;
|
use anyhow::{bail, Context};
|
||||||
use anyhow::Context;
|
|
||||||
use heed::RoTxn;
|
use heed::RoTxn;
|
||||||
use indexmap::IndexMap;
|
use indexmap::IndexMap;
|
||||||
use milli::update::{IndexDocumentsMethod, UpdateFormat::JsonStream};
|
use milli::update::{IndexDocumentsMethod, UpdateFormat::JsonStream};
|
||||||
@ -55,7 +52,7 @@ impl Index {
|
|||||||
}
|
}
|
||||||
|
|
||||||
serde_json::to_writer(&mut document_file, &json_map)?;
|
serde_json::to_writer(&mut document_file, &json_map)?;
|
||||||
std::io::Write::write(&mut document_file, b"\n")?;
|
document_file.write_all(b"\n")?;
|
||||||
|
|
||||||
json_map.clear();
|
json_map.clear();
|
||||||
}
|
}
|
||||||
@ -82,7 +79,7 @@ impl Index {
|
|||||||
pub fn load_dump(
|
pub fn load_dump(
|
||||||
src: impl AsRef<Path>,
|
src: impl AsRef<Path>,
|
||||||
dst: impl AsRef<Path>,
|
dst: impl AsRef<Path>,
|
||||||
size: u64,
|
size: usize,
|
||||||
indexing_options: &IndexerOpts,
|
indexing_options: &IndexerOpts,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let dir_name = src
|
let dir_name = src
|
||||||
@ -99,7 +96,7 @@ impl Index {
|
|||||||
primary_key,
|
primary_key,
|
||||||
} = serde_json::from_reader(&mut meta_file)?;
|
} = serde_json::from_reader(&mut meta_file)?;
|
||||||
let settings = settings.check();
|
let settings = settings.check();
|
||||||
let index = Self::open(&dst_dir_path, size as usize)?;
|
let index = Self::open(&dst_dir_path, size)?;
|
||||||
let mut txn = index.write_txn()?;
|
let mut txn = index.write_txn()?;
|
||||||
|
|
||||||
let handler = UpdateHandler::new(&indexing_options)?;
|
let handler = UpdateHandler::new(&indexing_options)?;
|
||||||
|
@ -1,10 +1,9 @@
|
|||||||
|
use std::collections::{BTreeSet, HashSet};
|
||||||
|
use std::fs::create_dir_all;
|
||||||
|
use std::marker::PhantomData;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
|
use std::path::Path;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{
|
|
||||||
collections::{BTreeSet, HashSet},
|
|
||||||
marker::PhantomData,
|
|
||||||
path::Path,
|
|
||||||
};
|
|
||||||
|
|
||||||
use anyhow::{bail, Context};
|
use anyhow::{bail, Context};
|
||||||
use heed::{EnvOpenOptions, RoTxn};
|
use heed::{EnvOpenOptions, RoTxn};
|
||||||
@ -44,7 +43,7 @@ where
|
|||||||
|
|
||||||
impl Index {
|
impl Index {
|
||||||
pub fn open(path: impl AsRef<Path>, size: usize) -> anyhow::Result<Self> {
|
pub fn open(path: impl AsRef<Path>, size: usize) -> anyhow::Result<Self> {
|
||||||
std::fs::create_dir_all(&path)?;
|
create_dir_all(&path)?;
|
||||||
let mut options = EnvOpenOptions::new();
|
let mut options = EnvOpenOptions::new();
|
||||||
options.map_size(size);
|
options.map_size(size);
|
||||||
let index = milli::Index::new(options, &path)?;
|
let index = milli::Index::new(options, &path)?;
|
||||||
@ -113,8 +112,6 @@ impl Index {
|
|||||||
|
|
||||||
let mut documents = Vec::new();
|
let mut documents = Vec::new();
|
||||||
|
|
||||||
println!("fields to display: {:?}", fields_to_display);
|
|
||||||
|
|
||||||
for entry in iter {
|
for entry in iter {
|
||||||
let (_id, obkv) = entry?;
|
let (_id, obkv) = entry?;
|
||||||
let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv)?;
|
let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv)?;
|
||||||
|
@ -197,10 +197,8 @@ impl Index {
|
|||||||
builder.update_format(format);
|
builder.update_format(format);
|
||||||
builder.index_documents_method(method);
|
builder.index_documents_method(method);
|
||||||
|
|
||||||
//let indexing_callback =
|
let indexing_callback =
|
||||||
//|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step);
|
|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step);
|
||||||
|
|
||||||
let indexing_callback = |_, _| ();
|
|
||||||
|
|
||||||
let gzipped = false;
|
let gzipped = false;
|
||||||
let addition = match content {
|
let addition = match content {
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{
|
|
||||||
collections::HashMap,
|
|
||||||
path::{Path, PathBuf},
|
|
||||||
};
|
|
||||||
|
|
||||||
use async_stream::stream;
|
use async_stream::stream;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
@ -24,8 +22,8 @@ pub struct DumpActor<UuidResolver, Update> {
|
|||||||
dump_path: PathBuf,
|
dump_path: PathBuf,
|
||||||
lock: Arc<Mutex<()>>,
|
lock: Arc<Mutex<()>>,
|
||||||
dump_infos: Arc<RwLock<HashMap<String, DumpInfo>>>,
|
dump_infos: Arc<RwLock<HashMap<String, DumpInfo>>>,
|
||||||
update_db_size: u64,
|
update_db_size: usize,
|
||||||
index_db_size: u64,
|
index_db_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generate uid from creation date
|
/// Generate uid from creation date
|
||||||
@ -43,8 +41,8 @@ where
|
|||||||
uuid_resolver: UuidResolver,
|
uuid_resolver: UuidResolver,
|
||||||
update: Update,
|
update: Update,
|
||||||
dump_path: impl AsRef<Path>,
|
dump_path: impl AsRef<Path>,
|
||||||
index_db_size: u64,
|
index_db_size: usize,
|
||||||
update_db_size: u64,
|
update_db_size: usize,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let dump_infos = Arc::new(RwLock::new(HashMap::new()));
|
let dump_infos = Arc::new(RwLock::new(HashMap::new()));
|
||||||
let lock = Arc::new(Mutex::new(()));
|
let lock = Arc::new(Mutex::new(()));
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg, DumpResult};
|
|
||||||
use actix_web::web::Bytes;
|
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
|
||||||
|
use actix_web::web::Bytes;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
|
use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg, DumpResult};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct DumpActorHandleImpl {
|
pub struct DumpActorHandleImpl {
|
||||||
sender: mpsc::Sender<DumpMsg>,
|
sender: mpsc::Sender<DumpMsg>,
|
||||||
@ -30,8 +32,8 @@ impl DumpActorHandleImpl {
|
|||||||
path: impl AsRef<Path>,
|
path: impl AsRef<Path>,
|
||||||
uuid_resolver: crate::index_controller::uuid_resolver::UuidResolverHandleImpl,
|
uuid_resolver: crate::index_controller::uuid_resolver::UuidResolverHandleImpl,
|
||||||
update: crate::index_controller::update_actor::UpdateActorHandleImpl<Bytes>,
|
update: crate::index_controller::update_actor::UpdateActorHandleImpl<Bytes>,
|
||||||
index_db_size: u64,
|
index_db_size: usize,
|
||||||
update_db_size: u64,
|
update_db_size: usize,
|
||||||
) -> anyhow::Result<Self> {
|
) -> anyhow::Result<Self> {
|
||||||
let (sender, receiver) = mpsc::channel(10);
|
let (sender, receiver) = mpsc::channel(10);
|
||||||
let actor = DumpActor::new(
|
let actor = DumpActor::new(
|
||||||
|
@ -1,22 +1,20 @@
|
|||||||
use std::{
|
use std::collections::{BTreeMap, BTreeSet};
|
||||||
collections::{BTreeMap, BTreeSet},
|
use std::fs::{create_dir_all, File};
|
||||||
fs::File,
|
use std::io::BufRead;
|
||||||
io::BufRead,
|
use std::marker::PhantomData;
|
||||||
marker::PhantomData,
|
use std::path::Path;
|
||||||
path::Path,
|
use std::sync::Arc;
|
||||||
sync::Arc,
|
|
||||||
};
|
|
||||||
|
|
||||||
use heed::EnvOpenOptions;
|
use heed::EnvOpenOptions;
|
||||||
use log::{error, info, warn};
|
use log::{error, info, warn};
|
||||||
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
|
use milli::update::{IndexDocumentsMethod, UpdateFormat};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{index::deserialize_some, index_controller::uuid_resolver::HeedUuidStore};
|
use crate::index_controller::{self, uuid_resolver::HeedUuidStore, IndexMetadata};
|
||||||
use crate::{
|
use crate::{
|
||||||
index::{Index, Unchecked},
|
index::{deserialize_some, update_handler::UpdateHandler, Index, Unchecked},
|
||||||
index_controller::{self, IndexMetadata},
|
option::IndexerOpts,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
@ -32,28 +30,33 @@ impl MetadataV1 {
|
|||||||
src: impl AsRef<Path>,
|
src: impl AsRef<Path>,
|
||||||
dst: impl AsRef<Path>,
|
dst: impl AsRef<Path>,
|
||||||
size: usize,
|
size: usize,
|
||||||
|
indexer_options: &IndexerOpts,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
info!(
|
info!(
|
||||||
"Loading dump, dump database version: {}, dump version: V1",
|
"Loading dump, dump database version: {}, dump version: V1",
|
||||||
self.db_version
|
self.db_version
|
||||||
);
|
);
|
||||||
|
|
||||||
dbg!("here");
|
|
||||||
|
|
||||||
let uuid_store = HeedUuidStore::new(&dst)?;
|
let uuid_store = HeedUuidStore::new(&dst)?;
|
||||||
dbg!("here");
|
|
||||||
for index in self.indexes {
|
for index in self.indexes {
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
uuid_store.insert(index.uid.clone(), uuid)?;
|
uuid_store.insert(index.uid.clone(), uuid)?;
|
||||||
let src = src.as_ref().join(index.uid);
|
let src = src.as_ref().join(index.uid);
|
||||||
load_index(&src, &dst, uuid, index.meta.primary_key.as_deref(), size)?;
|
load_index(
|
||||||
|
&src,
|
||||||
|
&dst,
|
||||||
|
uuid,
|
||||||
|
index.meta.primary_key.as_deref(),
|
||||||
|
size,
|
||||||
|
indexer_options,
|
||||||
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//This is the settings used in the last version of meilisearch exporting dump in V1
|
// These are the settings used in legacy meilisearch (<v0.21.0).
|
||||||
#[derive(Default, Clone, Serialize, Deserialize, Debug)]
|
#[derive(Default, Clone, Serialize, Deserialize, Debug)]
|
||||||
#[serde(rename_all = "camelCase", deny_unknown_fields)]
|
#[serde(rename_all = "camelCase", deny_unknown_fields)]
|
||||||
struct Settings {
|
struct Settings {
|
||||||
@ -87,10 +90,11 @@ fn load_index(
|
|||||||
uuid: Uuid,
|
uuid: Uuid,
|
||||||
primary_key: Option<&str>,
|
primary_key: Option<&str>,
|
||||||
size: usize,
|
size: usize,
|
||||||
|
indexer_options: &IndexerOpts,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let index_path = dst.as_ref().join(&format!("indexes/index-{}", uuid));
|
let index_path = dst.as_ref().join(&format!("indexes/index-{}", uuid));
|
||||||
|
|
||||||
std::fs::create_dir_all(&index_path)?;
|
create_dir_all(&index_path)?;
|
||||||
let mut options = EnvOpenOptions::new();
|
let mut options = EnvOpenOptions::new();
|
||||||
options.map_size(size);
|
options.map_size(size);
|
||||||
let index = milli::Index::new(options, index_path)?;
|
let index = milli::Index::new(options, index_path)?;
|
||||||
@ -99,31 +103,37 @@ fn load_index(
|
|||||||
// extract `settings.json` file and import content
|
// extract `settings.json` file and import content
|
||||||
let settings = import_settings(&src)?;
|
let settings = import_settings(&src)?;
|
||||||
let settings: index_controller::Settings<Unchecked> = settings.into();
|
let settings: index_controller::Settings<Unchecked> = settings.into();
|
||||||
let update_builder = UpdateBuilder::new(0);
|
|
||||||
index.update_settings(&settings.check(), update_builder)?;
|
|
||||||
|
|
||||||
let update_builder = UpdateBuilder::new(0);
|
let mut txn = index.write_txn()?;
|
||||||
|
|
||||||
|
let handler = UpdateHandler::new(&indexer_options)?;
|
||||||
|
|
||||||
|
index.update_settings_txn(&mut txn, &settings.check(), handler.update_builder(0))?;
|
||||||
|
|
||||||
let file = File::open(&src.as_ref().join("documents.jsonl"))?;
|
let file = File::open(&src.as_ref().join("documents.jsonl"))?;
|
||||||
let mut reader = std::io::BufReader::new(file);
|
let mut reader = std::io::BufReader::new(file);
|
||||||
reader.fill_buf()?;
|
reader.fill_buf()?;
|
||||||
if !reader.buffer().is_empty() {
|
if !reader.buffer().is_empty() {
|
||||||
index.update_documents(
|
index.update_documents_txn(
|
||||||
|
&mut txn,
|
||||||
UpdateFormat::JsonStream,
|
UpdateFormat::JsonStream,
|
||||||
IndexDocumentsMethod::ReplaceDocuments,
|
IndexDocumentsMethod::ReplaceDocuments,
|
||||||
Some(reader),
|
Some(reader),
|
||||||
update_builder,
|
handler.update_builder(0),
|
||||||
primary_key,
|
primary_key,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// the last step: we extract the original milli::Index and close it
|
txn.commit()?;
|
||||||
|
|
||||||
|
// Finaly, we extract the original milli::Index and close it
|
||||||
Arc::try_unwrap(index.0)
|
Arc::try_unwrap(index.0)
|
||||||
.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index")
|
.map_err(|_e| "Couln't close index properly")
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.prepare_for_closing()
|
.prepare_for_closing()
|
||||||
.wait();
|
.wait();
|
||||||
|
|
||||||
// Ignore updates in v1.
|
// Updates are ignored in dumps V1.
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -172,7 +182,7 @@ impl From<Settings> for index_controller::Settings<Unchecked> {
|
|||||||
|
|
||||||
/// Extract Settings from `settings.json` file present at provided `dir_path`
|
/// Extract Settings from `settings.json` file present at provided `dir_path`
|
||||||
fn import_settings(dir_path: impl AsRef<Path>) -> anyhow::Result<Settings> {
|
fn import_settings(dir_path: impl AsRef<Path>) -> anyhow::Result<Settings> {
|
||||||
let path = dbg!(dir_path.as_ref().join("settings.json"));
|
let path = dir_path.as_ref().join("settings.json");
|
||||||
let file = File::open(path)?;
|
let file = File::open(path)?;
|
||||||
let reader = std::io::BufReader::new(file);
|
let reader = std::io::BufReader::new(file);
|
||||||
let metadata = serde_json::from_reader(reader)?;
|
let metadata = serde_json::from_reader(reader)?;
|
||||||
|
@ -4,23 +4,21 @@ use chrono::{DateTime, Utc};
|
|||||||
use log::info;
|
use log::info;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::{
|
use crate::index::Index;
|
||||||
index::Index,
|
use crate::index_controller::{update_actor::UpdateStore, uuid_resolver::HeedUuidStore};
|
||||||
index_controller::{update_actor::UpdateStore, uuid_resolver::HeedUuidStore},
|
use crate::option::IndexerOpts;
|
||||||
option::IndexerOpts,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct MetadataV2 {
|
pub struct MetadataV2 {
|
||||||
db_version: String,
|
db_version: String,
|
||||||
index_db_size: u64,
|
index_db_size: usize,
|
||||||
update_db_size: u64,
|
update_db_size: usize,
|
||||||
dump_date: DateTime<Utc>,
|
dump_date: DateTime<Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MetadataV2 {
|
impl MetadataV2 {
|
||||||
pub fn new(index_db_size: u64, update_db_size: u64) -> Self {
|
pub fn new(index_db_size: usize, update_db_size: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
db_version: env!("CARGO_PKG_VERSION").to_string(),
|
db_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||||
index_db_size,
|
index_db_size,
|
||||||
@ -33,9 +31,8 @@ impl MetadataV2 {
|
|||||||
self,
|
self,
|
||||||
src: impl AsRef<Path>,
|
src: impl AsRef<Path>,
|
||||||
dst: impl AsRef<Path>,
|
dst: impl AsRef<Path>,
|
||||||
// TODO: use these variable to test if loading the index is possible.
|
index_db_size: usize,
|
||||||
_index_db_size: u64,
|
update_db_size: usize,
|
||||||
_update_db_size: u64,
|
|
||||||
indexing_options: &IndexerOpts,
|
indexing_options: &IndexerOpts,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
info!(
|
info!(
|
||||||
@ -47,14 +44,14 @@ impl MetadataV2 {
|
|||||||
HeedUuidStore::load_dump(src.as_ref(), &dst)?;
|
HeedUuidStore::load_dump(src.as_ref(), &dst)?;
|
||||||
|
|
||||||
info!("Loading updates.");
|
info!("Loading updates.");
|
||||||
UpdateStore::load_dump(&src, &dst, self.update_db_size)?;
|
UpdateStore::load_dump(&src, &dst, update_db_size)?;
|
||||||
|
|
||||||
info!("Loading indexes");
|
info!("Loading indexes.");
|
||||||
let indexes_path = src.as_ref().join("indexes");
|
let indexes_path = src.as_ref().join("indexes");
|
||||||
let indexes = indexes_path.read_dir()?;
|
let indexes = indexes_path.read_dir()?;
|
||||||
for index in indexes {
|
for index in indexes {
|
||||||
let index = index?;
|
let index = index?;
|
||||||
Index::load_dump(&index.path(), &dst, self.index_db_size, indexing_options)?;
|
Index::load_dump(&index.path(), &dst, index_db_size, indexing_options)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -8,6 +8,7 @@ use log::{error, info, warn};
|
|||||||
use mockall::automock;
|
use mockall::automock;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
use tokio::fs::create_dir_all;
|
||||||
|
|
||||||
use loaders::v1::MetadataV1;
|
use loaders::v1::MetadataV1;
|
||||||
use loaders::v2::MetadataV2;
|
use loaders::v2::MetadataV2;
|
||||||
@ -15,7 +16,6 @@ use loaders::v2::MetadataV2;
|
|||||||
pub use actor::DumpActor;
|
pub use actor::DumpActor;
|
||||||
pub use handle_impl::*;
|
pub use handle_impl::*;
|
||||||
pub use message::DumpMsg;
|
pub use message::DumpMsg;
|
||||||
use tokio::fs::create_dir_all;
|
|
||||||
|
|
||||||
use super::{update_actor::UpdateActorHandle, uuid_resolver::UuidResolverHandle};
|
use super::{update_actor::UpdateActorHandle, uuid_resolver::UuidResolverHandle};
|
||||||
use crate::{helpers::compression, option::IndexerOpts};
|
use crate::{helpers::compression, option::IndexerOpts};
|
||||||
@ -61,7 +61,7 @@ pub enum Metadata {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Metadata {
|
impl Metadata {
|
||||||
pub fn new_v2(index_db_size: u64, update_db_size: u64) -> Self {
|
pub fn new_v2(index_db_size: usize, update_db_size: usize) -> Self {
|
||||||
let meta = MetadataV2::new(index_db_size, update_db_size);
|
let meta = MetadataV2::new(index_db_size, update_db_size);
|
||||||
Self::V2(meta)
|
Self::V2(meta)
|
||||||
}
|
}
|
||||||
@ -117,8 +117,8 @@ impl DumpInfo {
|
|||||||
pub fn load_dump(
|
pub fn load_dump(
|
||||||
dst_path: impl AsRef<Path>,
|
dst_path: impl AsRef<Path>,
|
||||||
src_path: impl AsRef<Path>,
|
src_path: impl AsRef<Path>,
|
||||||
index_db_size: u64,
|
index_db_size: usize,
|
||||||
update_db_size: u64,
|
update_db_size: usize,
|
||||||
indexer_opts: &IndexerOpts,
|
indexer_opts: &IndexerOpts,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let tmp_src = tempfile::tempdir_in(".")?;
|
let tmp_src = tempfile::tempdir_in(".")?;
|
||||||
@ -139,7 +139,7 @@ pub fn load_dump(
|
|||||||
|
|
||||||
match meta {
|
match meta {
|
||||||
Metadata::V1(meta) => {
|
Metadata::V1(meta) => {
|
||||||
meta.load_dump(&tmp_src_path, tmp_dst.path(), index_db_size as usize)?
|
meta.load_dump(&tmp_src_path, tmp_dst.path(), index_db_size, indexer_opts)?
|
||||||
}
|
}
|
||||||
Metadata::V2(meta) => meta.load_dump(
|
Metadata::V2(meta) => meta.load_dump(
|
||||||
&tmp_src_path,
|
&tmp_src_path,
|
||||||
@ -166,8 +166,8 @@ struct DumpTask<U, P> {
|
|||||||
uuid_resolver: U,
|
uuid_resolver: U,
|
||||||
update_handle: P,
|
update_handle: P,
|
||||||
uid: String,
|
uid: String,
|
||||||
update_db_size: u64,
|
update_db_size: usize,
|
||||||
index_db_size: u64,
|
index_db_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<U, P> DumpTask<U, P>
|
impl<U, P> DumpTask<U, P>
|
||||||
|
@ -97,8 +97,8 @@ impl IndexController {
|
|||||||
load_dump(
|
load_dump(
|
||||||
&options.db_path,
|
&options.db_path,
|
||||||
src_path,
|
src_path,
|
||||||
options.max_mdb_size.get_bytes(),
|
options.max_mdb_size.get_bytes() as usize,
|
||||||
options.max_udb_size.get_bytes(),
|
options.max_udb_size.get_bytes() as usize,
|
||||||
&options.indexer_options,
|
&options.indexer_options,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
@ -116,8 +116,8 @@ impl IndexController {
|
|||||||
&options.dumps_dir,
|
&options.dumps_dir,
|
||||||
uuid_resolver.clone(),
|
uuid_resolver.clone(),
|
||||||
update_handle.clone(),
|
update_handle.clone(),
|
||||||
options.max_mdb_size.get_bytes(),
|
options.max_mdb_size.get_bytes() as usize,
|
||||||
options.max_udb_size.get_bytes(),
|
options.max_udb_size.get_bytes() as usize,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
if options.schedule_snapshot {
|
if options.schedule_snapshot {
|
||||||
|
@ -197,7 +197,7 @@ where
|
|||||||
async fn handle_dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
|
async fn handle_dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
|
||||||
let index_handle = self.index_handle.clone();
|
let index_handle = self.index_handle.clone();
|
||||||
let update_store = self.store.clone();
|
let update_store = self.store.clone();
|
||||||
println!("starting dump");
|
|
||||||
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
|
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
|
||||||
update_store.dump(&uuids, path.to_path_buf(), index_handle)?;
|
update_store.dump(&uuids, path.to_path_buf(), index_handle)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -129,7 +129,7 @@ impl UpdateStore {
|
|||||||
pub fn load_dump(
|
pub fn load_dump(
|
||||||
src: impl AsRef<Path>,
|
src: impl AsRef<Path>,
|
||||||
dst: impl AsRef<Path>,
|
dst: impl AsRef<Path>,
|
||||||
db_size: u64,
|
db_size: usize,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let dst_update_path = dst.as_ref().join("updates/");
|
let dst_update_path = dst.as_ref().join("updates/");
|
||||||
create_dir_all(&dst_update_path)?;
|
create_dir_all(&dst_update_path)?;
|
||||||
|
@ -589,9 +589,7 @@ mod test {
|
|||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let store_clone = update_store.clone();
|
let store_clone = update_store.clone();
|
||||||
tokio::task::spawn_blocking(move || {
|
tokio::task::spawn_blocking(move || {
|
||||||
store_clone
|
store_clone.register_update(meta, None, uuid).unwrap();
|
||||||
.register_update(meta, None, uuid)
|
|
||||||
.unwrap();
|
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
@ -1,14 +1,10 @@
|
|||||||
|
use std::collections::HashSet;
|
||||||
use std::fs::{create_dir_all, File};
|
use std::fs::{create_dir_all, File};
|
||||||
|
use std::io::{BufRead, BufReader, Write};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::{
|
|
||||||
collections::HashSet,
|
|
||||||
io::{BufRead, BufReader, Write},
|
|
||||||
};
|
|
||||||
|
|
||||||
use heed::{
|
use heed::types::{ByteSlice, Str};
|
||||||
types::{ByteSlice, Str},
|
use heed::{CompactionOption, Database, Env, EnvOpenOptions};
|
||||||
CompactionOption, Database, Env, EnvOpenOptions,
|
|
||||||
};
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
@ -21,6 +17,8 @@ struct DumpEntry {
|
|||||||
uid: String,
|
uid: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const UUIDS_DB_PATH: &str = "index_uuids";
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
pub trait UuidStore: Sized {
|
pub trait UuidStore: Sized {
|
||||||
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return
|
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return
|
||||||
@ -43,7 +41,7 @@ pub struct HeedUuidStore {
|
|||||||
|
|
||||||
impl HeedUuidStore {
|
impl HeedUuidStore {
|
||||||
pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
|
pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
|
||||||
let path = path.as_ref().join("index_uuids");
|
let path = path.as_ref().join(UUIDS_DB_PATH);
|
||||||
create_dir_all(&path)?;
|
create_dir_all(&path)?;
|
||||||
let mut options = EnvOpenOptions::new();
|
let mut options = EnvOpenOptions::new();
|
||||||
options.map_size(UUID_STORE_SIZE); // 1GB
|
options.map_size(UUID_STORE_SIZE); // 1GB
|
||||||
@ -137,7 +135,7 @@ impl HeedUuidStore {
|
|||||||
|
|
||||||
// only perform snapshot if there are indexes
|
// only perform snapshot if there are indexes
|
||||||
if !entries.is_empty() {
|
if !entries.is_empty() {
|
||||||
path.push("index_uuids");
|
path.push(UUIDS_DB_PATH);
|
||||||
create_dir_all(&path).unwrap();
|
create_dir_all(&path).unwrap();
|
||||||
path.push("data.mdb");
|
path.push("data.mdb");
|
||||||
env.copy_to_path(path, CompactionOption::Enabled)?;
|
env.copy_to_path(path, CompactionOption::Enabled)?;
|
||||||
@ -150,7 +148,7 @@ impl HeedUuidStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
|
pub fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
|
||||||
let dump_path = path.join("index_uuids");
|
let dump_path = path.join(UUIDS_DB_PATH);
|
||||||
create_dir_all(&dump_path)?;
|
create_dir_all(&dump_path)?;
|
||||||
let dump_file_path = dump_path.join("data.jsonl");
|
let dump_file_path = dump_path.join("data.jsonl");
|
||||||
let mut dump_file = File::create(&dump_file_path)?;
|
let mut dump_file = File::create(&dump_file_path)?;
|
||||||
@ -173,10 +171,10 @@ impl HeedUuidStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
|
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
|
||||||
let uuid_resolver_path = dst.as_ref().join("uuid_resolver/");
|
let uuid_resolver_path = dst.as_ref().join(UUIDS_DB_PATH);
|
||||||
std::fs::create_dir_all(&uuid_resolver_path)?;
|
std::fs::create_dir_all(&uuid_resolver_path)?;
|
||||||
|
|
||||||
let src_indexes = src.as_ref().join("index_uuids/data.jsonl");
|
let src_indexes = src.as_ref().join(UUIDS_DB_PATH).join("data.jsonl");
|
||||||
let indexes = File::open(&src_indexes)?;
|
let indexes = File::open(&src_indexes)?;
|
||||||
let mut indexes = BufReader::new(indexes);
|
let mut indexes = BufReader::new(indexes);
|
||||||
let mut line = String::new();
|
let mut line = String::new();
|
||||||
|
Loading…
Reference in New Issue
Block a user