191: dumps v2 r=irevoire a=MarinPostma



Co-authored-by: Marin Postma <postma.marin@protonmail.com>
Co-authored-by: marin <postma.marin@protonmail.com>
This commit is contained in:
bors[bot] 2021-06-01 09:46:31 +00:00 committed by GitHub
commit 3a7c1f2469
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 1261 additions and 994 deletions

View File

@ -50,7 +50,7 @@ mod mini_dashboard {
sha1_file.read_to_string(&mut sha1)?;
if sha1 == meta["sha1"].as_str().unwrap() {
// Nothing to do.
return Ok(())
return Ok(());
}
}
@ -62,7 +62,11 @@ mod mini_dashboard {
hasher.update(&dashboard_assets_bytes);
let sha1 = hex::encode(hasher.finalize());
assert_eq!(meta["sha1"].as_str().unwrap(), sha1, "Downloaded mini-dashboard shasum differs from the one specified in the Cargo.toml");
assert_eq!(
meta["sha1"].as_str().unwrap(),
sha1,
"Downloaded mini-dashboard shasum differs from the one specified in the Cargo.toml"
);
create_dir_all(&dashboard_dir)?;
let cursor = Cursor::new(&dashboard_assets_bytes);

View File

@ -4,7 +4,9 @@ use std::sync::Arc;
use sha2::Digest;
use crate::index::{Checked, Settings};
use crate::index_controller::{IndexController, IndexStats, Stats, DumpInfo, IndexMetadata, IndexSettings};
use crate::index_controller::{
DumpInfo, IndexController, IndexMetadata, IndexSettings, IndexStats, Stats,
};
use crate::option::Opt;
pub mod search;
@ -67,7 +69,11 @@ impl Data {
api_keys.generate_missing_api_keys();
let inner = DataInner { index_controller, api_keys, options };
let inner = DataInner {
index_controller,
api_keys,
options,
};
let inner = Arc::new(inner);
Ok(Data { inner })

View File

@ -299,7 +299,7 @@ impl From<JsonPayloadError> for Error {
JsonPayloadError::Payload(err) => {
Error::BadRequest(format!("Problem while decoding the request: {}", err))
}
e => Error::Internal(format!("Unexpected Json error: {}", e))
e => Error::Internal(format!("Unexpected Json error: {}", e)),
}
}
}
@ -310,7 +310,7 @@ impl From<QueryPayloadError> for Error {
QueryPayloadError::Deserialize(err) => {
Error::BadRequest(format!("Invalid query parameters: {}", err))
}
e => Error::Internal(format!("Unexpected query payload error: {}", e))
e => Error::Internal(format!("Unexpected query payload error: {}", e)),
}
}
}

View File

@ -1,16 +1,16 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_web::body::Body;
use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
use actix_web::web;
use actix_web::body::Body;
use futures::ready;
use futures::future::{ok, Future, Ready};
use actix_web::ResponseError as _;
use futures::future::{ok, Future, Ready};
use futures::ready;
use pin_project::pin_project;
use crate::Data;
use crate::error::{Error, ResponseError};
use crate::Data;
#[derive(Clone, Copy)]
pub enum Authentication {
@ -59,19 +59,15 @@ where
let data = req.app_data::<web::Data<Data>>().unwrap();
if data.api_keys().master.is_none() {
return AuthenticationFuture::Authenticated(self.service.call(req))
return AuthenticationFuture::Authenticated(self.service.call(req));
}
let auth_header = match req.headers().get("X-Meili-API-Key") {
Some(auth) => match auth.to_str() {
Ok(auth) => auth,
Err(_) => {
return AuthenticationFuture::NoHeader(Some(req))
}
Err(_) => return AuthenticationFuture::NoHeader(Some(req)),
},
None => {
return AuthenticationFuture::NoHeader(Some(req))
}
None => return AuthenticationFuture::NoHeader(Some(req)),
};
let authenticated = match self.acl {
@ -114,12 +110,10 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this {
AuthProj::Authenticated(fut) => {
match ready!(fut.poll(cx)) {
AuthProj::Authenticated(fut) => match ready!(fut.poll(cx)) {
Ok(resp) => Poll::Ready(Ok(resp)),
Err(e) => Poll::Ready(Err(e)),
}
}
},
AuthProj::NoHeader(req) => {
match req.take() {
Some(req) => {
@ -135,7 +129,8 @@ where
AuthProj::Refused(req) => {
match req.take() {
Some(req) => {
let bad_token = req.headers()
let bad_token = req
.headers()
.get("X-Meili-API-Key")
.map(|h| h.to_str().map(String::from).unwrap_or_default())
.unwrap_or_default();

View File

@ -0,0 +1,132 @@
use std::fs::{create_dir_all, File};
use std::io::{BufRead, BufReader, Write};
use std::path::Path;
use std::sync::Arc;
use anyhow::{bail, Context};
use heed::RoTxn;
use indexmap::IndexMap;
use milli::update::{IndexDocumentsMethod, UpdateFormat::JsonStream};
use serde::{Deserialize, Serialize};
use crate::option::IndexerOpts;
use super::{update_handler::UpdateHandler, Index, Settings, Unchecked};
#[derive(Serialize, Deserialize)]
struct DumpMeta {
settings: Settings<Unchecked>,
primary_key: Option<String>,
}
const META_FILE_NAME: &str = "meta.json";
const DATA_FILE_NAME: &str = "documents.jsonl";
impl Index {
pub fn dump(&self, path: impl AsRef<Path>) -> anyhow::Result<()> {
// acquire write txn make sure any ongoing write is finished before we start.
let txn = self.env.write_txn()?;
self.dump_documents(&txn, &path)?;
self.dump_meta(&txn, &path)?;
Ok(())
}
fn dump_documents(&self, txn: &RoTxn, path: impl AsRef<Path>) -> anyhow::Result<()> {
let document_file_path = path.as_ref().join(DATA_FILE_NAME);
let mut document_file = File::create(&document_file_path)?;
let documents = self.all_documents(txn)?;
let fields_ids_map = self.fields_ids_map(txn)?;
// dump documents
let mut json_map = IndexMap::new();
for document in documents {
let (_, reader) = document?;
for (fid, bytes) in reader.iter() {
if let Some(name) = fields_ids_map.name(fid) {
json_map.insert(name, serde_json::from_slice::<serde_json::Value>(bytes)?);
}
}
serde_json::to_writer(&mut document_file, &json_map)?;
document_file.write_all(b"\n")?;
json_map.clear();
}
Ok(())
}
fn dump_meta(&self, txn: &RoTxn, path: impl AsRef<Path>) -> anyhow::Result<()> {
let meta_file_path = path.as_ref().join(META_FILE_NAME);
let mut meta_file = File::create(&meta_file_path)?;
let settings = self.settings_txn(txn)?.into_unchecked();
let primary_key = self.primary_key(txn)?.map(String::from);
let meta = DumpMeta {
settings,
primary_key,
};
serde_json::to_writer(&mut meta_file, &meta)?;
Ok(())
}
pub fn load_dump(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
size: usize,
indexing_options: &IndexerOpts,
) -> anyhow::Result<()> {
let dir_name = src
.as_ref()
.file_name()
.with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?;
let dst_dir_path = dst.as_ref().join("indexes").join(dir_name);
create_dir_all(&dst_dir_path)?;
let meta_path = src.as_ref().join(META_FILE_NAME);
let mut meta_file = File::open(meta_path)?;
let DumpMeta {
settings,
primary_key,
} = serde_json::from_reader(&mut meta_file)?;
let settings = settings.check();
let index = Self::open(&dst_dir_path, size)?;
let mut txn = index.write_txn()?;
let handler = UpdateHandler::new(&indexing_options)?;
index.update_settings_txn(&mut txn, &settings, handler.update_builder(0))?;
let document_file_path = src.as_ref().join(DATA_FILE_NAME);
let reader = File::open(&document_file_path)?;
let mut reader = BufReader::new(reader);
reader.fill_buf()?;
// If the document file is empty, we don't perform the document addition, to prevent
// a primary key error to be thrown.
if !reader.buffer().is_empty() {
index.update_documents_txn(
&mut txn,
JsonStream,
IndexDocumentsMethod::UpdateDocuments,
Some(reader),
handler.update_builder(0),
primary_key.as_deref(),
)?;
}
txn.commit()?;
match Arc::try_unwrap(index.0) {
Ok(inner) => inner.prepare_for_closing().wait(),
Err(_) => bail!("Could not close index properly."),
}
Ok(())
}
}

View File

@ -1,17 +1,23 @@
use std::{collections::{BTreeSet, HashSet}, marker::PhantomData};
use std::collections::{BTreeSet, HashSet};
use std::fs::create_dir_all;
use std::marker::PhantomData;
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use anyhow::{bail, Context};
use heed::{EnvOpenOptions, RoTxn};
use milli::obkv_to_json;
use serde_json::{Map, Value};
use crate::helpers::EnvSizer;
pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT};
pub use updates::{Facets, Settings, Checked, Unchecked};
use serde::{de::Deserializer, Deserialize};
pub use updates::{Checked, Facets, Settings, Unchecked};
mod dump;
mod search;
pub mod update_handler;
mod updates;
pub type Document = Map<String, Value>;
@ -36,9 +42,20 @@ where
}
impl Index {
pub fn open(path: impl AsRef<Path>, size: usize) -> anyhow::Result<Self> {
create_dir_all(&path)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, &path)?;
Ok(Index(Arc::new(index)))
}
pub fn settings(&self) -> anyhow::Result<Settings<Checked>> {
let txn = self.read_txn()?;
self.settings_txn(&txn)
}
pub fn settings_txn(&self, txn: &RoTxn) -> anyhow::Result<Settings<Checked>> {
let displayed_attributes = self
.displayed_fields(&txn)?
.map(|fields| fields.into_iter().map(String::from).collect());
@ -95,8 +112,6 @@ impl Index {
let mut documents = Vec::new();
println!("fields to display: {:?}", fields_to_display);
for entry in iter {
let (_id, obkv) = entry?;
let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv)?;

View File

@ -90,7 +90,8 @@ impl Index {
let mut documents = Vec::new();
let fields_ids_map = self.fields_ids_map(&rtxn).unwrap();
let displayed_ids = self.displayed_fields_ids(&rtxn)?
let displayed_ids = self
.displayed_fields_ids(&rtxn)?
.map(|fields| fields.into_iter().collect::<HashSet<_>>())
.unwrap_or_else(|| fields_ids_map.iter().map(|(id, _)| id).collect());
@ -156,10 +157,8 @@ impl Index {
};
let stop_words = fst::Set::default();
let highlighter = Highlighter::new(
&stop_words,
(String::from("<em>"), String::from("</em>")),
);
let highlighter =
Highlighter::new(&stop_words, (String::from("<em>"), String::from("</em>")));
for (_id, obkv) in self.documents(&rtxn, documents_ids)? {
let document = make_document(&all_attributes, &fields_ids_map, obkv)?;
@ -384,17 +383,16 @@ mod test {
#[test]
fn no_formatted() {
let stop_words = fst::Set::default();
let highlighter = Highlighter::new(
&stop_words,
(String::from("<em>"), String::from("</em>")),
);
let highlighter =
Highlighter::new(&stop_words, (String::from("<em>"), String::from("</em>")));
let mut fields = FieldsIdsMap::new();
let id = fields.insert("test").unwrap();
let mut buf = Vec::new();
let mut obkv = obkv::KvWriter::new(&mut buf);
obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()).unwrap();
obkv.insert(id, Value::String("hello".into()).to_string().as_bytes())
.unwrap();
obkv.finish().unwrap();
let obkv = obkv::KvReader::new(&buf);
@ -410,8 +408,9 @@ mod test {
&highlighter,
&matching_words,
&all_formatted,
&to_highlight_ids
).unwrap();
&to_highlight_ids,
)
.unwrap();
assert!(value.is_empty());
}
@ -419,17 +418,16 @@ mod test {
#[test]
fn formatted_no_highlight() {
let stop_words = fst::Set::default();
let highlighter = Highlighter::new(
&stop_words,
(String::from("<em>"), String::from("</em>")),
);
let highlighter =
Highlighter::new(&stop_words, (String::from("<em>"), String::from("</em>")));
let mut fields = FieldsIdsMap::new();
let id = fields.insert("test").unwrap();
let mut buf = Vec::new();
let mut obkv = obkv::KvWriter::new(&mut buf);
obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()).unwrap();
obkv.insert(id, Value::String("hello".into()).to_string().as_bytes())
.unwrap();
obkv.finish().unwrap();
let obkv = obkv::KvReader::new(&buf);
@ -445,8 +443,9 @@ mod test {
&highlighter,
&matching_words,
&all_formatted,
&to_highlight_ids
).unwrap();
&to_highlight_ids,
)
.unwrap();
assert_eq!(value["test"], "hello");
}
@ -454,17 +453,16 @@ mod test {
#[test]
fn formatted_with_highlight() {
let stop_words = fst::Set::default();
let highlighter = Highlighter::new(
&stop_words,
(String::from("<em>"), String::from("</em>")),
);
let highlighter =
Highlighter::new(&stop_words, (String::from("<em>"), String::from("</em>")));
let mut fields = FieldsIdsMap::new();
let id = fields.insert("test").unwrap();
let mut buf = Vec::new();
let mut obkv = obkv::KvWriter::new(&mut buf);
obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()).unwrap();
obkv.insert(id, Value::String("hello".into()).to_string().as_bytes())
.unwrap();
obkv.finish().unwrap();
let obkv = obkv::KvReader::new(&buf);
@ -480,8 +478,9 @@ mod test {
&highlighter,
&matching_words,
&all_formatted,
&to_highlight_ids
).unwrap();
&to_highlight_ids,
)
.unwrap();
assert_eq!(value["test"], "<em>hello</em>");
}

View File

@ -38,7 +38,7 @@ impl UpdateHandler {
})
}
fn update_builder(&self, update_id: u64) -> UpdateBuilder {
pub fn update_builder(&self, update_id: u64) -> UpdateBuilder {
// We prepare the update by using the update builder.
let mut update_builder = UpdateBuilder::new(update_id);
if let Some(max_nb_chunks) = self.max_nb_chunks {

View File

@ -87,6 +87,28 @@ impl Settings<Checked> {
_kind: PhantomData,
}
}
pub fn into_unchecked(self) -> Settings<Unchecked> {
let Self {
displayed_attributes,
searchable_attributes,
attributes_for_faceting,
ranking_rules,
stop_words,
distinct_attribute,
..
} = self;
Settings {
displayed_attributes,
searchable_attributes,
attributes_for_faceting,
ranking_rules,
stop_words,
distinct_attribute,
_kind: PhantomData,
}
}
}
impl Settings<Unchecked> {

View File

@ -1,27 +1,29 @@
use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus};
use crate::helpers::compression;
use crate::index_controller::{index_actor, update_actor, uuid_resolver, IndexMetadata};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_stream::stream;
use chrono::Utc;
use futures::stream::StreamExt;
use futures::{lock::Mutex, stream::StreamExt};
use log::{error, info};
use std::{
collections::HashSet,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid;
use update_actor::UpdateActorHandle;
use uuid_resolver::UuidResolverHandle;
use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus, DumpTask};
use crate::index_controller::{update_actor, uuid_resolver};
pub const CONCURRENT_DUMP_MSG: usize = 10;
pub struct DumpActor<UuidResolver, Index, Update> {
pub struct DumpActor<UuidResolver, Update> {
inbox: Option<mpsc::Receiver<DumpMsg>>,
uuid_resolver: UuidResolver,
index: Index,
update: Update,
dump_path: PathBuf,
dump_info: Arc<RwLock<Option<DumpInfo>>>,
lock: Arc<Mutex<()>>,
dump_infos: Arc<RwLock<HashMap<String, DumpInfo>>>,
update_db_size: usize,
index_db_size: usize,
}
/// Generate uid from creation date
@ -29,26 +31,30 @@ fn generate_uid() -> String {
Utc::now().format("%Y%m%d-%H%M%S%3f").to_string()
}
impl<UuidResolver, Index, Update> DumpActor<UuidResolver, Index, Update>
impl<UuidResolver, Update> DumpActor<UuidResolver, Update>
where
UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static,
Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static,
Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static,
UuidResolver: UuidResolverHandle + Send + Sync + Clone + 'static,
Update: UpdateActorHandle + Send + Sync + Clone + 'static,
{
pub fn new(
inbox: mpsc::Receiver<DumpMsg>,
uuid_resolver: UuidResolver,
index: Index,
update: Update,
dump_path: impl AsRef<Path>,
index_db_size: usize,
update_db_size: usize,
) -> Self {
let dump_infos = Arc::new(RwLock::new(HashMap::new()));
let lock = Arc::new(Mutex::new(()));
Self {
inbox: Some(inbox),
uuid_resolver,
index,
update,
dump_path: dump_path.as_ref().into(),
dump_info: Arc::new(RwLock::new(None)),
dump_infos,
lock,
index_db_size,
update_db_size,
}
}
@ -90,149 +96,61 @@ where
}
async fn handle_create_dump(&self, ret: oneshot::Sender<DumpResult<DumpInfo>>) {
if self.is_running().await {
let uid = generate_uid();
let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress);
let _lock = match self.lock.try_lock() {
Some(lock) => lock,
None => {
ret.send(Err(DumpError::DumpAlreadyRunning))
.expect("Dump actor is dead");
return;
}
let uid = generate_uid();
let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress);
*self.dump_info.write().await = Some(info.clone());
};
self.dump_infos
.write()
.await
.insert(uid.clone(), info.clone());
ret.send(Ok(info)).expect("Dump actor is dead");
let dump_info = self.dump_info.clone();
let task = DumpTask {
path: self.dump_path.clone(),
uuid_resolver: self.uuid_resolver.clone(),
update_handle: self.update.clone(),
uid: uid.clone(),
update_db_size: self.update_db_size,
index_db_size: self.index_db_size,
};
let task_result = tokio::task::spawn(perform_dump(
self.dump_path.clone(),
self.uuid_resolver.clone(),
self.index.clone(),
self.update.clone(),
uid.clone(),
))
.await;
let task_result = tokio::task::spawn(task.run()).await;
let mut dump_infos = self.dump_infos.write().await;
let dump_infos = dump_infos
.get_mut(&uid)
.expect("dump entry deleted while lock was acquired");
match task_result {
Ok(Ok(())) => {
(*dump_info.write().await).as_mut().expect("Inconsistent dump service state").done();
dump_infos.done();
info!("Dump succeed");
}
Ok(Err(e)) => {
(*dump_info.write().await).as_mut().expect("Inconsistent dump service state").with_error(e.to_string());
dump_infos.with_error(e.to_string());
error!("Dump failed: {}", e);
}
Err(_) => {
dump_infos.with_error("Unexpected error while performing dump.".to_string());
error!("Dump panicked. Dump status set to failed");
*dump_info.write().await = Some(DumpInfo::new(uid, DumpStatus::Failed));
}
};
}
async fn handle_dump_info(&self, uid: String) -> DumpResult<DumpInfo> {
match &*self.dump_info.read().await {
None => self.dump_from_fs(uid).await,
Some(DumpInfo { uid: ref s, .. }) if &uid != s => self.dump_from_fs(uid).await,
match self.dump_infos.read().await.get(&uid) {
Some(info) => Ok(info.clone()),
_ => Err(DumpError::DumpDoesNotExist(uid)),
}
}
async fn dump_from_fs(&self, uid: String) -> DumpResult<DumpInfo> {
self.dump_path
.join(format!("{}.dump", &uid))
.exists()
.then(|| DumpInfo::new(uid.clone(), DumpStatus::Done))
.ok_or(DumpError::DumpDoesNotExist(uid))
}
async fn is_running(&self) -> bool {
matches!(
*self.dump_info.read().await,
Some(DumpInfo {
status: DumpStatus::InProgress,
..
})
)
}
}
async fn perform_dump<UuidResolver, Index, Update>(
dump_path: PathBuf,
uuid_resolver: UuidResolver,
index: Index,
update: Update,
uid: String,
) -> anyhow::Result<()>
where
UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static,
Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static,
Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static,
{
info!("Performing dump.");
let dump_dir = dump_path.clone();
tokio::fs::create_dir_all(&dump_dir).await?;
let temp_dump_dir =
tokio::task::spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
let uuids = uuid_resolver.list().await?;
// maybe we could just keep the vec as-is
let uuids: HashSet<(String, Uuid)> = uuids.into_iter().collect();
if uuids.is_empty() {
return Ok(());
}
let indexes = list_indexes(&uuid_resolver, &index).await?;
// we create one directory by index
for meta in indexes.iter() {
tokio::fs::create_dir(temp_dump_path.join(&meta.uid)).await?;
}
let metadata = super::Metadata::new(indexes, env!("CARGO_PKG_VERSION").to_string());
metadata.to_path(&temp_dump_path).await?;
update.dump(uuids, temp_dump_path.clone()).await?;
let dump_dir = dump_path.clone();
let dump_path = dump_path.join(format!("{}.dump", uid));
let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?;
let temp_dump_file_path = temp_dump_file.path().to_owned();
compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?;
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
})
.await??;
info!("Created dump in {:?}.", dump_path);
Ok(())
}
async fn list_indexes<UuidResolver, Index>(
uuid_resolver: &UuidResolver,
index: &Index,
) -> anyhow::Result<Vec<IndexMetadata>>
where
UuidResolver: uuid_resolver::UuidResolverHandle,
Index: index_actor::IndexActorHandle,
{
let uuids = uuid_resolver.list().await?;
let mut ret = Vec::new();
for (uid, uuid) in uuids {
let meta = index.get_index_meta(uuid).await?;
let meta = IndexMetadata {
uuid,
name: uid.clone(),
uid,
meta,
};
ret.push(meta);
}
Ok(ret)
}

View File

@ -1,6 +1,8 @@
use std::path::{Path};
use std::path::Path;
use actix_web::web::Bytes;
use tokio::sync::{mpsc, oneshot};
use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg, DumpResult};
#[derive(Clone)]
@ -29,13 +31,22 @@ impl DumpActorHandleImpl {
pub fn new(
path: impl AsRef<Path>,
uuid_resolver: crate::index_controller::uuid_resolver::UuidResolverHandleImpl,
index: crate::index_controller::index_actor::IndexActorHandleImpl,
update: crate::index_controller::update_actor::UpdateActorHandleImpl<Bytes>,
index_db_size: usize,
update_db_size: usize,
) -> anyhow::Result<Self> {
let (sender, receiver) = mpsc::channel(10);
let actor = DumpActor::new(receiver, uuid_resolver, index, update, path);
let actor = DumpActor::new(
receiver,
uuid_resolver,
update,
path,
index_db_size,
update_db_size,
);
tokio::task::spawn(actor.run());
Ok(Self { sender })
}
}

View File

@ -0,0 +1,2 @@
pub mod v1;
pub mod v2;

View File

@ -1,12 +1,62 @@
use std::{collections::{BTreeMap, BTreeSet}, marker::PhantomData};
use std::collections::{BTreeMap, BTreeSet};
use std::fs::{create_dir_all, File};
use std::io::BufRead;
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use log::warn;
use heed::EnvOpenOptions;
use log::{error, info, warn};
use milli::update::{IndexDocumentsMethod, UpdateFormat};
use serde::{Deserialize, Serialize};
use crate::{index::Unchecked, index_controller};
use crate::index::deserialize_some;
use super::*;
use uuid::Uuid;
/// This is the settings used in the last version of meilisearch exporting dump in V1
use crate::index_controller::{self, uuid_resolver::HeedUuidStore, IndexMetadata};
use crate::{
index::{deserialize_some, update_handler::UpdateHandler, Index, Unchecked},
option::IndexerOpts,
};
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MetadataV1 {
db_version: String,
indexes: Vec<IndexMetadata>,
}
impl MetadataV1 {
pub fn load_dump(
self,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
size: usize,
indexer_options: &IndexerOpts,
) -> anyhow::Result<()> {
info!(
"Loading dump, dump database version: {}, dump version: V1",
self.db_version
);
let uuid_store = HeedUuidStore::new(&dst)?;
for index in self.indexes {
let uuid = Uuid::new_v4();
uuid_store.insert(index.uid.clone(), uuid)?;
let src = src.as_ref().join(index.uid);
load_index(
&src,
&dst,
uuid,
index.meta.primary_key.as_deref(),
size,
indexer_options,
)?;
}
Ok(())
}
}
// These are the settings used in legacy meilisearch (<v0.21.0).
#[derive(Default, Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
struct Settings {
@ -26,6 +76,60 @@ struct Settings {
pub attributes_for_faceting: Option<Option<Vec<String>>>,
}
fn load_index(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
uuid: Uuid,
primary_key: Option<&str>,
size: usize,
indexer_options: &IndexerOpts,
) -> anyhow::Result<()> {
let index_path = dst.as_ref().join(&format!("indexes/index-{}", uuid));
create_dir_all(&index_path)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, index_path)?;
let index = Index(Arc::new(index));
// extract `settings.json` file and import content
let settings = import_settings(&src)?;
let settings: index_controller::Settings<Unchecked> = settings.into();
let mut txn = index.write_txn()?;
let handler = UpdateHandler::new(&indexer_options)?;
index.update_settings_txn(&mut txn, &settings.check(), handler.update_builder(0))?;
let file = File::open(&src.as_ref().join("documents.jsonl"))?;
let mut reader = std::io::BufReader::new(file);
reader.fill_buf()?;
if !reader.buffer().is_empty() {
index.update_documents_txn(
&mut txn,
UpdateFormat::JsonStream,
IndexDocumentsMethod::ReplaceDocuments,
Some(reader),
handler.update_builder(0),
primary_key,
)?;
}
txn.commit()?;
// Finaly, we extract the original milli::Index and close it
Arc::try_unwrap(index.0)
.map_err(|_e| "Couldn't close the index properly")
.unwrap()
.prepare_for_closing()
.wait();
// Updates are ignored in dumps V1.
Ok(())
}
/// we need to **always** be able to convert the old settings to the settings currently being used
impl From<Settings> for index_controller::Settings<Unchecked> {
fn from(settings: Settings) -> Self {
@ -69,54 +173,11 @@ impl From<Settings> for index_controller::Settings<Unchecked> {
}
/// Extract Settings from `settings.json` file present at provided `dir_path`
fn import_settings(dir_path: &Path) -> anyhow::Result<Settings> {
let path = dir_path.join("settings.json");
fn import_settings(dir_path: impl AsRef<Path>) -> anyhow::Result<Settings> {
let path = dir_path.as_ref().join("settings.json");
let file = File::open(path)?;
let reader = std::io::BufReader::new(file);
let metadata = serde_json::from_reader(reader)?;
Ok(metadata)
}
pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> {
let index_path = db_path.join(&format!("indexes/index-{}", uuid));
info!("Importing a dump from an old version of meilisearch with dump version 1");
std::fs::create_dir_all(&index_path)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, index_path)?;
let index = Index(Arc::new(index));
// extract `settings.json` file and import content
let settings = import_settings(&dump_path)?;
let settings: index_controller::Settings<Unchecked> = settings.into();
let update_builder = UpdateBuilder::new(0);
index.update_settings(&settings.check(), update_builder)?;
let update_builder = UpdateBuilder::new(1);
let file = File::open(&dump_path.join("documents.jsonl"))?;
let reader = std::io::BufReader::new(file);
// TODO: TAMO: waiting for milli. We should use the result
let _ = index.update_documents(
UpdateFormat::JsonStream,
IndexDocumentsMethod::ReplaceDocuments,
Some(reader),
update_builder,
primary_key,
);
// the last step: we extract the original milli::Index and close it
Arc::try_unwrap(index.0)
.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index")
.unwrap()
.prepare_for_closing()
.wait();
// at this point we should handle the import of the updates, but since the update logic is not handled in
// meilisearch we are just going to ignore this part
Ok(())
}

View File

@ -0,0 +1,59 @@
use std::path::Path;
use chrono::{DateTime, Utc};
use log::info;
use serde::{Deserialize, Serialize};
use crate::index::Index;
use crate::index_controller::{update_actor::UpdateStore, uuid_resolver::HeedUuidStore};
use crate::option::IndexerOpts;
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MetadataV2 {
db_version: String,
index_db_size: usize,
update_db_size: usize,
dump_date: DateTime<Utc>,
}
impl MetadataV2 {
pub fn new(index_db_size: usize, update_db_size: usize) -> Self {
Self {
db_version: env!("CARGO_PKG_VERSION").to_string(),
index_db_size,
update_db_size,
dump_date: Utc::now(),
}
}
pub fn load_dump(
self,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
index_db_size: usize,
update_db_size: usize,
indexing_options: &IndexerOpts,
) -> anyhow::Result<()> {
info!(
"Loading dump from {}, dump database version: {}, dump version: V2",
self.dump_date, self.db_version
);
info!("Loading index database.");
HeedUuidStore::load_dump(src.as_ref(), &dst)?;
info!("Loading updates.");
UpdateStore::load_dump(&src, &dst, update_db_size)?;
info!("Loading indexes.");
let indexes_path = src.as_ref().join("indexes");
let indexes = indexes_path.read_dir()?;
for index in indexes {
let index = index?;
Index::load_dump(&index.path(), &dst, index_db_size, indexing_options)?;
}
Ok(())
}
}

View File

@ -1,7 +1,6 @@
use tokio::sync::oneshot;
use super::{DumpResult, DumpInfo};
use super::{DumpInfo, DumpResult};
pub enum DumpMsg {
CreateDump {
@ -12,4 +11,3 @@ pub enum DumpMsg {
ret: oneshot::Sender<DumpResult<DumpInfo>>,
},
}

View File

@ -1,31 +1,32 @@
mod actor;
mod handle_impl;
mod message;
mod v1;
mod v2;
use std::fs::File;
use std::path::{Path, PathBuf};
use std::{fs::File, path::Path, sync::Arc};
use anyhow::bail;
use heed::EnvOpenOptions;
use log::{error, info};
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
use anyhow::Context;
use chrono::{DateTime, Utc};
use log::{error, info, warn};
#[cfg(test)]
use mockall::automock;
use serde::{Deserialize, Serialize};
use tempfile::TempDir;
use thiserror::Error;
use uuid::Uuid;
use tokio::fs::create_dir_all;
use super::IndexMetadata;
use crate::helpers::compression;
use crate::index::Index;
use crate::index_controller::uuid_resolver;
use loaders::v1::MetadataV1;
use loaders::v2::MetadataV2;
pub use actor::DumpActor;
pub use handle_impl::*;
pub use message::DumpMsg;
use super::{update_actor::UpdateActorHandle, uuid_resolver::UuidResolverHandle};
use crate::{helpers::compression, option::IndexerOpts};
mod actor;
mod handle_impl;
mod loaders;
mod message;
const META_FILE_NAME: &str = "metadata.json";
pub type DumpResult<T> = std::result::Result<T, DumpError>;
#[derive(Error, Debug)]
@ -40,31 +41,6 @@ pub enum DumpError {
DumpDoesNotExist(String),
}
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
enum DumpVersion {
V1,
V2,
}
impl DumpVersion {
const CURRENT: Self = Self::V2;
/// Select the good importation function from the `DumpVersion` of metadata
pub fn import_index(
self,
size: usize,
uuid: Uuid,
dump_path: &Path,
db_path: &Path,
primary_key: Option<&str>,
) -> anyhow::Result<()> {
match self {
Self::V1 => v1::import_index(size, uuid, dump_path, db_path, primary_key),
Self::V2 => v2::import_index(size, uuid, dump_path, db_path, primary_key),
}
}
}
#[async_trait::async_trait]
#[cfg_attr(test, automock)]
pub trait DumpActorHandle {
@ -78,39 +54,16 @@ pub trait DumpActorHandle {
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Metadata {
indexes: Vec<IndexMetadata>,
db_version: String,
dump_version: DumpVersion,
#[serde(tag = "dumpVersion")]
pub enum Metadata {
V1(MetadataV1),
V2(MetadataV2),
}
impl Metadata {
/// Create a Metadata with the current dump version of meilisearch.
pub fn new(indexes: Vec<IndexMetadata>, db_version: String) -> Self {
Metadata {
indexes,
db_version,
dump_version: DumpVersion::CURRENT,
}
}
/// Extract Metadata from `metadata.json` file present at provided `dir_path`
fn from_path(dir_path: &Path) -> anyhow::Result<Self> {
let path = dir_path.join("metadata.json");
let file = File::open(path)?;
let reader = std::io::BufReader::new(file);
let metadata = serde_json::from_reader(reader)?;
Ok(metadata)
}
/// Write Metadata in `metadata.json` file at provided `dir_path`
pub async fn to_path(&self, dir_path: &Path) -> anyhow::Result<()> {
let path = dir_path.join("metadata.json");
tokio::fs::write(path, serde_json::to_string(self)?).await?;
Ok(())
pub fn new_v2(index_db_size: usize, update_db_size: usize) -> Self {
let meta = MetadataV2::new(index_db_size, update_db_size);
Self::V2(meta)
}
}
@ -129,6 +82,9 @@ pub struct DumpInfo {
pub status: DumpStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
started_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
finished_at: Option<DateTime<Utc>>,
}
impl DumpInfo {
@ -137,15 +93,19 @@ impl DumpInfo {
uid,
status,
error: None,
started_at: Utc::now(),
finished_at: None,
}
}
pub fn with_error(&mut self, error: String) {
self.status = DumpStatus::Failed;
self.finished_at = Some(Utc::now());
self.error = Some(error);
}
pub fn done(&mut self) {
self.finished_at = Some(Utc::now());
self.status = DumpStatus::Done;
}
@ -155,80 +115,100 @@ impl DumpInfo {
}
pub fn load_dump(
db_path: impl AsRef<Path>,
dump_path: impl AsRef<Path>,
size: usize,
dst_path: impl AsRef<Path>,
src_path: impl AsRef<Path>,
index_db_size: usize,
update_db_size: usize,
indexer_opts: &IndexerOpts,
) -> anyhow::Result<()> {
info!("Importing dump from {}...", dump_path.as_ref().display());
let db_path = db_path.as_ref();
let dump_path = dump_path.as_ref();
let uuid_resolver = uuid_resolver::HeedUuidStore::new(&db_path)?;
let tmp_src = tempfile::tempdir_in(".")?;
let tmp_src_path = tmp_src.path();
// extract the dump in a temporary directory
let tmp_dir = TempDir::new_in(db_path)?;
let tmp_dir_path = tmp_dir.path();
compression::from_tar_gz(dump_path, tmp_dir_path)?;
compression::from_tar_gz(&src_path, tmp_src_path)?;
// read dump metadata
let metadata = Metadata::from_path(&tmp_dir_path)?;
let meta_path = tmp_src_path.join(META_FILE_NAME);
let mut meta_file = File::open(&meta_path)?;
let meta: Metadata = serde_json::from_reader(&mut meta_file)?;
// remove indexes which have same `uuid` than indexes to import and create empty indexes
let existing_index_uids = uuid_resolver.list()?;
let dst_dir = dst_path
.as_ref()
.parent()
.with_context(|| format!("Invalid db path: {}", dst_path.as_ref().display()))?;
info!("Deleting indexes already present in the db and provided in the dump...");
for idx in &metadata.indexes {
if let Some((_, uuid)) = existing_index_uids.iter().find(|(s, _)| s == &idx.uid) {
// if we find the index in the `uuid_resolver` it's supposed to exist on the file system
// and we want to delete it
let path = db_path.join(&format!("indexes/index-{}", uuid));
info!("Deleting {}", path.display());
use std::io::ErrorKind::*;
match std::fs::remove_dir_all(path) {
Ok(()) => (),
// if an index was present in the metadata but missing of the fs we can ignore the
// problem because we are going to create it later
Err(e) if e.kind() == NotFound => (),
Err(e) => bail!(e),
let tmp_dst = tempfile::tempdir_in(dst_dir)?;
match meta {
Metadata::V1(meta) => {
meta.load_dump(&tmp_src_path, tmp_dst.path(), index_db_size, indexer_opts)?
}
} else {
// if the index does not exist in the `uuid_resolver` we create it
uuid_resolver.create_uuid(idx.uid.clone(), false)?;
Metadata::V2(meta) => meta.load_dump(
&tmp_src_path,
tmp_dst.path(),
index_db_size,
update_db_size,
indexer_opts,
)?,
}
// Persist and atomically rename the db
let persisted_dump = tmp_dst.into_path();
if dst_path.as_ref().exists() {
warn!("Overwriting database at {}", dst_path.as_ref().display());
std::fs::remove_dir_all(&dst_path)?;
}
// import each indexes content
for idx in metadata.indexes {
let dump_path = tmp_dir_path.join(&idx.uid);
// this cannot fail since we created all the missing uuid in the previous loop
let uuid = uuid_resolver.get_uuid(idx.uid)?.unwrap();
std::fs::rename(&persisted_dump, &dst_path)?;
info!(
"Importing dump from {} into {}...",
dump_path.display(),
db_path.display()
);
metadata.dump_version.import_index(
size,
uuid,
&dump_path,
&db_path,
idx.meta.primary_key.as_ref().map(|s| s.as_ref()),
)?;
info!("Dump importation from {} succeed", dump_path.display());
}
// finally we can move all the unprocessed update file into our new DB
// this directory may not exists
let update_path = tmp_dir_path.join("update_files");
let db_update_path = db_path.join("updates/update_files");
if update_path.exists() {
let _ = std::fs::remove_dir_all(db_update_path);
std::fs::rename(
tmp_dir_path.join("update_files"),
db_path.join("updates/update_files"),
)?;
}
info!("Dump importation from {} succeed", dump_path.display());
Ok(())
}
struct DumpTask<U, P> {
path: PathBuf,
uuid_resolver: U,
update_handle: P,
uid: String,
update_db_size: usize,
index_db_size: usize,
}
impl<U, P> DumpTask<U, P>
where
U: UuidResolverHandle + Send + Sync + Clone + 'static,
P: UpdateActorHandle + Send + Sync + Clone + 'static,
{
async fn run(self) -> anyhow::Result<()> {
info!("Performing dump.");
create_dir_all(&self.path).await?;
let path_clone = self.path.clone();
let temp_dump_dir =
tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(path_clone)).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
let meta = Metadata::new_v2(self.index_db_size, self.update_db_size);
let meta_path = temp_dump_path.join(META_FILE_NAME);
let mut meta_file = File::create(&meta_path)?;
serde_json::to_writer(&mut meta_file, &meta)?;
let uuids = self.uuid_resolver.dump(temp_dump_path.clone()).await?;
self.update_handle
.dump(uuids, temp_dump_path.clone())
.await?;
let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?;
compression::to_tar_gz(temp_dump_path, temp_dump_file.path())?;
let dump_path = self.path.join(self.uid).with_extension("dump");
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
})
.await??;
info!("Created dump in {:?}.", dump_path);
Ok(())
}
}

View File

@ -1,89 +0,0 @@
use heed::EnvOpenOptions;
use log::info;
use uuid::Uuid;
use crate::{index::Unchecked, index_controller::{UpdateStatus, update_actor::UpdateStore}};
use std::io::BufRead;
use milli::{update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}};
use crate::index::{Checked, Index};
use crate::index_controller::Settings;
use std::{fs::File, path::Path, sync::Arc};
/// Extract Settings from `settings.json` file present at provided `dir_path`
fn import_settings(dir_path: &Path) -> anyhow::Result<Settings<Checked>> {
let path = dir_path.join("settings.json");
let file = File::open(path)?;
let reader = std::io::BufReader::new(file);
let metadata: Settings<Unchecked> = serde_json::from_reader(reader)?;
println!("Meta: {:?}", metadata);
Ok(metadata.check())
}
pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> {
let index_path = db_path.join(&format!("indexes/index-{}", uuid));
std::fs::create_dir_all(&index_path)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, index_path)?;
let index = Index(Arc::new(index));
let mut txn = index.write_txn()?;
info!("importing the settings...");
// extract `settings.json` file and import content
let settings = import_settings(&dump_path)?;
let update_builder = UpdateBuilder::new(0);
index.update_settings_txn(&mut txn, &settings, update_builder)?;
// import the documents in the index
let update_builder = UpdateBuilder::new(1);
let file = File::open(&dump_path.join("documents.jsonl"))?;
let reader = std::io::BufReader::new(file);
info!("importing the documents...");
// TODO: TAMO: currently we ignore any error caused by the importation of the documents because
// if there is no documents nor primary key it'll throw an anyhow error, but we must remove
// this before the merge on main
index.update_documents_txn(
&mut txn,
UpdateFormat::JsonStream,
IndexDocumentsMethod::ReplaceDocuments,
Some(reader),
update_builder,
primary_key,
)?;
txn.commit()?;
// the last step: we extract the original milli::Index and close it
Arc::try_unwrap(index.0)
.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index")
.unwrap()
.prepare_for_closing()
.wait();
info!("importing the updates...");
import_updates(uuid, dump_path, db_path)
}
fn import_updates(uuid: Uuid, dump_path: &Path, db_path: &Path) -> anyhow::Result<()> {
let update_path = db_path.join("updates");
let options = EnvOpenOptions::new();
// create an UpdateStore to import the updates
std::fs::create_dir_all(&update_path)?;
let (update_store, _) = UpdateStore::create(options, &update_path)?;
let file = File::open(&dump_path.join("updates.jsonl"))?;
let reader = std::io::BufReader::new(file);
let mut wtxn = update_store.env.write_txn()?;
for update in reader.lines() {
let mut update: UpdateStatus = serde_json::from_str(&update?)?;
if let Some(path) = update.content_path_mut() {
*path = update_path.join("update_files").join(&path);
}
update_store.register_raw_updates(&mut wtxn, update, uuid)?;
}
wtxn.commit()?;
Ok(())
}

View File

@ -6,14 +6,15 @@ use async_stream::stream;
use futures::stream::StreamExt;
use heed::CompactionOption;
use log::debug;
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
use tokio::{fs, sync::mpsc};
use uuid::Uuid;
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::index::{
update_handler::UpdateHandler, Checked, Document, SearchQuery, SearchResult, Settings,
};
use crate::index_controller::{
get_arc_ownership_blocking, update_handler::UpdateHandler, Failed, IndexStats, Processed,
Processing,
get_arc_ownership_blocking, Failed, IndexStats, Processed, Processing,
};
use crate::option::IndexerOpts;
@ -30,10 +31,14 @@ pub struct IndexActor<S> {
impl<S: IndexStore + Sync + Send> IndexActor<S> {
pub fn new(receiver: mpsc::Receiver<IndexMsg>, store: S) -> IndexResult<Self> {
let options = IndexerOpts::default();
let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?;
let update_handler = UpdateHandler::new(&options)?;
let update_handler = Arc::new(update_handler);
let receiver = Some(receiver);
Ok(Self { receiver, update_handler, store })
Ok(Self {
receiver,
update_handler,
store,
})
}
/// `run` poll the write_receiver and read_receiver concurrently, but while messages send
@ -122,8 +127,8 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Snapshot { uuid, path, ret } => {
let _ = ret.send(self.handle_snapshot(uuid, path).await);
}
Dump { uid, uuid, path, ret } => {
let _ = ret.send(self.handle_dump(&uid, uuid, path).await);
Dump { uuid, path, ret } => {
let _ = ret.send(self.handle_dump(uuid, path).await);
}
GetStats { uuid, ret } => {
let _ = ret.send(self.handle_get_stats(uuid).await);
@ -146,9 +151,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
primary_key: Option<String>,
) -> IndexResult<IndexMeta> {
let index = self.store.create(uuid, primary_key).await?;
let meta = spawn_blocking(move || IndexMeta::new(&index))
.await
.map_err(|e| IndexError::Error(e.into()))??;
let meta = spawn_blocking(move || IndexMeta::new(&index)).await??;
Ok(meta)
}
@ -165,9 +168,9 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
None => self.store.create(uuid, None).await?,
};
spawn_blocking(move || update_handler.handle_update(meta, data, index))
.await
.map_err(|e| IndexError::Error(e.into()))
let result =
spawn_blocking(move || update_handler.handle_update(meta, data, index)).await?;
Ok(result)
}
async fn handle_settings(&self, uuid: Uuid) -> IndexResult<Settings<Checked>> {
@ -176,9 +179,8 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || index.settings().map_err(IndexError::Error))
.await
.map_err(|e| IndexError::Error(e.into()))?
let result = spawn_blocking(move || index.settings()).await??;
Ok(result)
}
async fn handle_fetch_documents(
@ -193,13 +195,11 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || {
index
.retrieve_documents(offset, limit, attributes_to_retrieve)
.map_err(IndexError::Error)
})
.await
.map_err(|e| IndexError::Error(e.into()))?
let result =
spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve))
.await??;
Ok(result)
}
async fn handle_fetch_document(
@ -213,13 +213,12 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || {
index
.retrieve_document(doc_id, attributes_to_retrieve)
.map_err(IndexError::Error)
})
.await
.map_err(|e| IndexError::Error(e.into()))?
let result =
spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve))
.await??;
Ok(result)
}
async fn handle_delete(&self, uuid: Uuid) -> IndexResult<()> {
@ -242,9 +241,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
async fn handle_get_meta(&self, uuid: Uuid) -> IndexResult<IndexMeta> {
match self.store.get(uuid).await? {
Some(index) => {
let meta = spawn_blocking(move || IndexMeta::new(&index))
.await
.map_err(|e| IndexError::Error(e.into()))??;
let meta = spawn_blocking(move || IndexMeta::new(&index)).await??;
Ok(meta)
}
None => Err(IndexError::UnexistingIndex),
@ -262,7 +259,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || match index_settings.primary_key {
let result = spawn_blocking(move || match index_settings.primary_key {
Some(ref primary_key) => {
let mut txn = index.write_txn()?;
if index.primary_key(&txn)?.is_some() {
@ -278,23 +275,22 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(meta)
}
})
.await
.map_err(|e| IndexError::Error(e.into()))?
.await??;
Ok(result)
}
async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> IndexResult<()> {
use tokio::fs::create_dir_all;
path.push("indexes");
create_dir_all(&path)
.await
.map_err(|e| IndexError::Error(e.into()))?;
create_dir_all(&path).await?;
if let Some(index) = self.store.get(uuid).await? {
let mut index_path = path.join(format!("index-{}", uuid));
create_dir_all(&index_path)
.await
.map_err(|e| IndexError::Error(e.into()))?;
create_dir_all(&index_path).await?;
index_path.push("data.mdb");
spawn_blocking(move || -> anyhow::Result<()> {
// Get write txn to wait for ongoing write transaction before snapshot.
@ -304,9 +300,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.copy_to_path(index_path, CompactionOption::Enabled)?;
Ok(())
})
.await
.map_err(|e| IndexError::Error(e.into()))?
.map_err(IndexError::Error)?;
.await??;
}
Ok(())
@ -314,50 +308,17 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
/// Create a `documents.jsonl` and a `settings.json` in `path/uid/` with a dump of all the
/// documents and all the settings.
async fn handle_dump(&self, uid: &str, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
use std::io::prelude::*;
use tokio::fs::create_dir_all;
async fn handle_dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
create_dir_all(&path)
.await
.map_err(|e| IndexError::Error(e.into()))?;
let path = path.join(format!("indexes/index-{}/", uuid));
fs::create_dir_all(&path).await?;
if let Some(index) = self.store.get(uuid).await? {
let documents_path = path.join(uid).join("documents.jsonl");
let settings_path = path.join(uid).join("settings.json");
spawn_blocking(move || -> anyhow::Result<()> {
// first we dump all the documents
let file = File::create(documents_path)?;
let mut file = std::io::BufWriter::new(file);
// Get write txn to wait for ongoing write transaction before dump.
let txn = index.write_txn()?;
let fields_ids_map = index.fields_ids_map(&txn)?;
// we want to save **all** the fields in the dump.
let fields_to_dump: Vec<u8> = fields_ids_map.iter().map(|(id, _)| id).collect();
for document in index.all_documents(&txn)? {
let (_doc_id, document) = document?;
let json = milli::obkv_to_json(&fields_to_dump, &fields_ids_map, document)?;
file.write_all(serde_json::to_string(&json)?.as_bytes())?;
file.write_all(b"\n")?;
}
// then we dump all the settings
let file = File::create(settings_path)?;
let mut file = std::io::BufWriter::new(file);
let settings = index.settings()?;
file.write_all(serde_json::to_string(&settings)?.as_bytes())?;
file.write_all(b"\n")?;
Ok(())
})
.await
.map_err(|e| IndexError::Error(e.into()))?
.map_err(IndexError::Error)?;
}
tokio::task::spawn_blocking(move || index.dump(path)).await??;
Ok(())
}
@ -379,7 +340,6 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
fields_distribution: index.fields_distribution(&rtxn)?,
})
})
.await
.map_err(|e| IndexError::Error(e.into()))?
.await?
}
}

View File

@ -3,7 +3,10 @@ use std::path::{Path, PathBuf};
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use crate::{index::Checked, index_controller::{IndexSettings, IndexStats, Processing}};
use crate::{
index::Checked,
index_controller::{IndexSettings, IndexStats, Processing},
};
use crate::{
index::{Document, SearchQuery, SearchResult, Settings},
index_controller::{Failed, Processed},
@ -136,9 +139,9 @@ impl IndexActorHandle for IndexActorHandleImpl {
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn dump(&self, uid: String, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Dump { uid, uuid, path, ret };
let msg = IndexMsg::Dump { uuid, path, ret };
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}

View File

@ -3,7 +3,7 @@ use std::path::PathBuf;
use tokio::sync::oneshot;
use uuid::Uuid;
use crate::index::{Document, SearchQuery, SearchResult, Settings, Checked};
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::index_controller::{Failed, IndexStats, Processed, Processing};
use super::{IndexMeta, IndexResult, IndexSettings};
@ -61,7 +61,6 @@ pub enum IndexMsg {
ret: oneshot::Sender<IndexResult<()>>,
},
Dump {
uid: String,
uuid: Uuid,
path: PathBuf,
ret: oneshot::Sender<IndexResult<()>>,

View File

@ -15,7 +15,7 @@ use message::IndexMsg;
use store::{IndexStore, MapIndexStore};
use crate::index::{Checked, Document, Index, SearchQuery, SearchResult, Settings};
use crate::index_controller::{Failed, Processed, Processing, IndexStats};
use crate::index_controller::{Failed, IndexStats, Processed, Processing};
use super::IndexSettings;
@ -44,24 +44,45 @@ impl IndexMeta {
let created_at = index.created_at(&txn)?;
let updated_at = index.updated_at(&txn)?;
let primary_key = index.primary_key(&txn)?.map(String::from);
Ok(Self { created_at, updated_at, primary_key })
Ok(Self {
created_at,
updated_at,
primary_key,
})
}
}
#[derive(Error, Debug)]
pub enum IndexError {
#[error("error with index: {0}")]
Error(#[from] anyhow::Error),
#[error("index already exists")]
IndexAlreadyExists,
#[error("Index doesn't exists")]
UnexistingIndex,
#[error("Heed error: {0}")]
HeedError(#[from] heed::Error),
#[error("Existing primary key")]
ExistingPrimaryKey,
#[error("Internal Index Error: {0}")]
Internal(String),
}
macro_rules! internal_error {
($($other:path), *) => {
$(
impl From<$other> for IndexError {
fn from(other: $other) -> Self {
Self::Internal(other.to_string())
}
}
)*
}
}
internal_error!(
anyhow::Error,
heed::Error,
tokio::task::JoinError,
std::io::Error
);
#[async_trait::async_trait]
#[cfg_attr(test, automock)]
pub trait IndexActorHandle {
@ -97,7 +118,7 @@ pub trait IndexActorHandle {
index_settings: IndexSettings,
) -> IndexResult<IndexMeta>;
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>;
async fn dump(&self, uid: String, uuid: Uuid, path: PathBuf) -> IndexResult<()>;
async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>;
async fn get_index_stats(&self, uuid: Uuid) -> IndexResult<IndexStats>;
}
@ -178,8 +199,8 @@ mod test {
self.as_ref().snapshot(uuid, path).await
}
async fn dump(&self, uid: String, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
self.as_ref().dump(uid, uuid, path).await
async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
self.as_ref().dump(uuid, path).await
}
async fn get_index_stats(&self, uuid: Uuid) -> IndexResult<IndexStats> {

View File

@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use heed::EnvOpenOptions;
use tokio::fs;
use tokio::sync::RwLock;
use tokio::task::spawn_blocking;
@ -48,7 +47,7 @@ impl IndexStore for MapIndexStore {
let index_size = self.index_size;
let index = spawn_blocking(move || -> IndexResult<Index> {
let index = open_index(&path, index_size)?;
let index = Index::open(path, index_size)?;
if let Some(primary_key) = primary_key {
let mut txn = index.write_txn()?;
index.put_primary_key(&mut txn, &primary_key)?;
@ -56,8 +55,7 @@ impl IndexStore for MapIndexStore {
}
Ok(index)
})
.await
.map_err(|e| IndexError::Error(e.into()))??;
.await??;
self.index_store.write().await.insert(uuid, index.clone());
@ -77,9 +75,7 @@ impl IndexStore for MapIndexStore {
}
let index_size = self.index_size;
let index = spawn_blocking(move || open_index(path, index_size))
.await
.map_err(|e| IndexError::Error(e.into()))??;
let index = spawn_blocking(move || Index::open(path, index_size)).await??;
self.index_store.write().await.insert(uuid, index.clone());
Ok(Some(index))
}
@ -88,18 +84,8 @@ impl IndexStore for MapIndexStore {
async fn delete(&self, uuid: Uuid) -> IndexResult<Option<Index>> {
let db_path = self.path.join(format!("index-{}", uuid));
fs::remove_dir_all(db_path)
.await
.map_err(|e| IndexError::Error(e.into()))?;
fs::remove_dir_all(db_path).await?;
let index = self.index_store.write().await.remove(&uuid);
Ok(index)
}
}
fn open_index(path: impl AsRef<Path>, size: usize) -> IndexResult<Index> {
std::fs::create_dir_all(&path).map_err(|e| IndexError::Error(e.into()))?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, &path).map_err(IndexError::Error)?;
Ok(Index(Arc::new(index)))
}

View File

@ -14,24 +14,23 @@ use tokio::sync::mpsc;
use tokio::time::sleep;
use uuid::Uuid;
pub use updates::*;
pub use dump_actor::{DumpInfo, DumpStatus};
use dump_actor::DumpActorHandle;
pub use dump_actor::{DumpInfo, DumpStatus};
use index_actor::IndexActorHandle;
use snapshot::{SnapshotService, load_snapshot};
use snapshot::{load_snapshot, SnapshotService};
use update_actor::UpdateActorHandle;
use uuid_resolver::{UuidError, UuidResolverHandle};
pub use updates::*;
use uuid_resolver::{UuidResolverError, UuidResolverHandle};
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::option::Opt;
use dump_actor::load_dump;
use self::dump_actor::load_dump;
mod dump_actor;
mod index_actor;
mod snapshot;
mod dump_actor;
mod update_actor;
mod update_handler;
mod updates;
mod uuid_resolver;
@ -94,13 +93,14 @@ impl IndexController {
options.ignore_snapshot_if_db_exists,
options.ignore_missing_snapshot,
)?;
} else if let Some(ref path) = options.import_dump {
} else if let Some(ref src_path) = options.import_dump {
load_dump(
&options.db_path,
path,
index_size,
src_path,
options.max_mdb_size.get_bytes() as usize,
options.max_udb_size.get_bytes() as usize,
&options.indexer_options,
)?;
}
std::fs::create_dir_all(&path)?;
@ -112,7 +112,13 @@ impl IndexController {
&path,
update_store_size,
)?;
let dump_handle = dump_actor::DumpActorHandleImpl::new(&options.dumps_dir, uuid_resolver.clone(), index_handle.clone(), update_handle.clone())?;
let dump_handle = dump_actor::DumpActorHandleImpl::new(
&options.dumps_dir,
uuid_resolver.clone(),
update_handle.clone(),
options.max_mdb_size.get_bytes() as usize,
options.max_udb_size.get_bytes() as usize,
)?;
if options.schedule_snapshot {
let snapshot_service = SnapshotService::new(
@ -159,11 +165,6 @@ impl IndexController {
// registered and the update_actor that waits for the the payload to be sent to it.
tokio::task::spawn_local(async move {
payload
.map(|bytes| {
bytes.map_err(|e| {
Box::new(e) as Box<dyn std::error::Error + Sync + Send + 'static>
})
})
.for_each(|r| async {
let _ = sender.send(r).await;
})
@ -176,7 +177,7 @@ impl IndexController {
match self.uuid_resolver.get(uid).await {
Ok(uuid) => Ok(perform_update(uuid).await?),
Err(UuidError::UnexistingIndex(name)) => {
Err(UuidResolverError::UnexistingIndex(name)) => {
let uuid = Uuid::new_v4();
let status = perform_update(uuid).await?;
// ignore if index creation fails now, since it may already have been created
@ -230,7 +231,7 @@ impl IndexController {
match self.uuid_resolver.get(uid).await {
Ok(uuid) => Ok(perform_udpate(uuid).await?),
Err(UuidError::UnexistingIndex(name)) if create => {
Err(UuidResolverError::UnexistingIndex(name)) if create => {
let uuid = Uuid::new_v4();
let status = perform_udpate(uuid).await?;
// ignore if index creation fails now, since it may already have been created

View File

@ -144,7 +144,7 @@ mod test {
use crate::index_controller::update_actor::{
MockUpdateActorHandle, UpdateActorHandleImpl, UpdateError,
};
use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidError};
use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidResolverError};
#[actix_rt::test]
async fn test_normal() {
@ -193,7 +193,7 @@ mod test {
.expect_snapshot()
.times(1)
// abitrary error
.returning(|_| Box::pin(err(UuidError::NameAlreadyExist)));
.returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist)));
let update_handle = MockUpdateActorHandle::new();
@ -248,7 +248,7 @@ mod test {
// we expect the funtion to be called between 2 and 3 time in the given interval.
.times(2..4)
// abitrary error, to short-circuit the function
.returning(move |_| Box::pin(err(UuidError::NameAlreadyExist)));
.returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist)));
let update_handle = MockUpdateActorHandle::new();

View File

@ -11,7 +11,7 @@ use tokio::sync::mpsc;
use uuid::Uuid;
use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo};
use crate::index_controller::index_actor::{IndexActorHandle};
use crate::index_controller::index_actor::IndexActorHandle;
use crate::index_controller::{UpdateMeta, UpdateStatus};
pub struct UpdateActor<D, I> {
@ -42,7 +42,12 @@ where
let store = UpdateStore::open(options, &path, index_handle.clone())?;
std::fs::create_dir_all(path.join("update_files"))?;
assert!(path.exists());
Ok(Self { path, store, inbox, index_handle })
Ok(Self {
path,
store,
inbox,
index_handle,
})
}
pub async fn run(mut self) {
@ -90,9 +95,7 @@ where
mut payload: mpsc::Receiver<PayloadData<D>>,
) -> Result<UpdateStatus> {
let file_path = match meta {
UpdateMeta::DocumentsAddition { .. }
| UpdateMeta::DeleteDocuments => {
UpdateMeta::DocumentsAddition { .. } | UpdateMeta::DeleteDocuments => {
let update_file_id = uuid::Uuid::new_v4();
let path = self
.path
@ -102,39 +105,26 @@ where
.write(true)
.create(true)
.open(&path)
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
.await?;
let mut file_len = 0;
while let Some(bytes) = payload.recv().await {
match bytes {
Ok(bytes) => {
let bytes = bytes?;
file_len += bytes.as_ref().len();
file.write_all(bytes.as_ref())
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
}
Err(e) => {
return Err(UpdateError::Error(e));
}
}
file.write_all(bytes.as_ref()).await?;
}
if file_len != 0 {
file.flush()
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
file.flush().await?;
let file = file.into_std().await;
Some((file, path))
Some((file, update_file_id))
} else {
// empty update, delete the empty file.
fs::remove_file(&path)
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
fs::remove_file(&path).await?;
None
}
}
_ => None
_ => None,
};
let update_store = self.store.clone();
@ -143,52 +133,45 @@ where
use std::io::{copy, sink, BufReader, Seek};
// If the payload is empty, ignore the check.
let path = if let Some((mut file, path)) = file_path {
let update_uuid = if let Some((mut file, uuid)) = file_path {
// set the file back to the beginning
file.seek(SeekFrom::Start(0)).map_err(|e| UpdateError::Error(Box::new(e)))?;
file.seek(SeekFrom::Start(0))?;
// Check that the json payload is valid:
let reader = BufReader::new(&mut file);
let mut checker = JsonChecker::new(reader);
if copy(&mut checker, &mut sink()).is_err() || checker.finish().is_err() {
// The json file is invalid, we use Serde to get a nice error message:
file.seek(SeekFrom::Start(0))
.map_err(|e| UpdateError::Error(Box::new(e)))?;
let _: serde_json::Value = serde_json::from_reader(file)
.map_err(|e| UpdateError::Error(Box::new(e)))?;
file.seek(SeekFrom::Start(0))?;
let _: serde_json::Value = serde_json::from_reader(file)?;
}
Some(path)
Some(uuid)
} else {
None
};
// The payload is valid, we can register it to the update store.
update_store
.register_update(meta, path, uuid)
.map(UpdateStatus::Enqueued)
.map_err(|e| UpdateError::Error(Box::new(e)))
let status = update_store
.register_update(meta, update_uuid, uuid)
.map(UpdateStatus::Enqueued)?;
Ok(status)
})
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?
.await?
}
async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let update_store = self.store.clone();
tokio::task::spawn_blocking(move || {
let result = update_store
.list(uuid)
.map_err(|e| UpdateError::Error(e.into()))?;
let result = update_store.list(uuid)?;
Ok(result)
})
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?
.await?
}
async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
let store = self.store.clone();
let result = store
.meta(uuid, id)
.map_err(|e| UpdateError::Error(Box::new(e)))?
.meta(uuid, id)?
.ok_or(UpdateError::UnexistingUpdate(id))?;
Ok(result)
}
@ -196,10 +179,7 @@ where
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
let store = self.store.clone();
tokio::task::spawn_blocking(move || store.delete_all(uuid))
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
tokio::task::spawn_blocking(move || store.delete_all(uuid)).await??;
Ok(())
}
@ -209,23 +189,21 @@ where
let update_store = self.store.clone();
tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle))
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
.await??;
Ok(())
}
async fn handle_dump(&self, uuids: HashSet<(String, Uuid)>, path: PathBuf) -> Result<()> {
async fn handle_dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let index_handle = self.index_handle.clone();
let update_store = self.store.clone();
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
update_store.dump(&uuids, path.to_path_buf(), index_handle)?;
Ok(())
})
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
.await??;
Ok(())
}
@ -235,9 +213,7 @@ where
let info = update_store.get_info()?;
Ok(info)
})
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
.await??;
Ok(info)
}

View File

@ -71,7 +71,7 @@ where
receiver.await.expect("update actor killed.")
}
async fn dump(&self, uuids: HashSet<(String, Uuid)>, path: PathBuf) -> Result<()> {
async fn dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Dump { uuids, path, ret };
let _ = self.sender.send(msg).await;

View File

@ -32,7 +32,7 @@ pub enum UpdateMsg<D> {
ret: oneshot::Sender<Result<()>>,
},
Dump {
uuids: HashSet<(String, Uuid)>,
uuids: HashSet<Uuid>,
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
},

View File

@ -1,10 +1,11 @@
mod actor;
mod handle_impl;
mod message;
mod update_store;
pub mod store;
use std::{collections::HashSet, path::PathBuf};
use actix_http::error::PayloadError;
use thiserror::Error;
use tokio::sync::mpsc;
use uuid::Uuid;
@ -14,23 +15,44 @@ use crate::index_controller::{UpdateMeta, UpdateStatus};
use actor::UpdateActor;
use message::UpdateMsg;
pub use update_store::{UpdateStore, UpdateStoreInfo};
pub use handle_impl::UpdateActorHandleImpl;
pub use store::{UpdateStore, UpdateStoreInfo};
pub type Result<T> = std::result::Result<T, UpdateError>;
type PayloadData<D> = std::result::Result<D, Box<dyn std::error::Error + Sync + Send + 'static>>;
type PayloadData<D> = std::result::Result<D, PayloadError>;
#[cfg(test)]
use mockall::automock;
#[derive(Debug, Error)]
pub enum UpdateError {
#[error("error with update: {0}")]
Error(Box<dyn std::error::Error + Sync + Send + 'static>),
#[error("Update {0} doesn't exist.")]
UnexistingUpdate(u64),
#[error("Internal error processing update: {0}")]
Internal(String),
}
macro_rules! internal_error {
($($other:path), *) => {
$(
impl From<$other> for UpdateError {
fn from(other: $other) -> Self {
Self::Internal(other.to_string())
}
}
)*
}
}
internal_error!(
heed::Error,
std::io::Error,
serde_json::Error,
PayloadError,
tokio::task::JoinError,
anyhow::Error
);
#[async_trait::async_trait]
#[cfg_attr(test, automock(type Data=Vec<u8>;))]
pub trait UpdateActorHandle {
@ -40,7 +62,7 @@ pub trait UpdateActorHandle {
async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus>;
async fn delete(&self, uuid: Uuid) -> Result<()>;
async fn snapshot(&self, uuid: HashSet<Uuid>, path: PathBuf) -> Result<()>;
async fn dump(&self, uuid: HashSet<(String, Uuid)>, path: PathBuf) -> Result<()>;
async fn dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()>;
async fn get_info(&self) -> Result<UpdateStoreInfo>;
async fn update(
&self,

View File

@ -0,0 +1,86 @@
use std::{borrow::Cow, convert::TryInto, mem::size_of};
use heed::{BytesDecode, BytesEncode};
use uuid::Uuid;
pub struct NextIdCodec;
pub enum NextIdKey {
Global,
Index(Uuid),
}
impl<'a> BytesEncode<'a> for NextIdCodec {
type EItem = NextIdKey;
fn bytes_encode(item: &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
match item {
NextIdKey::Global => Some(Cow::Borrowed(b"__global__")),
NextIdKey::Index(ref uuid) => Some(Cow::Borrowed(uuid.as_bytes())),
}
}
}
pub struct PendingKeyCodec;
impl<'a> BytesEncode<'a> for PendingKeyCodec {
type EItem = (u64, Uuid, u64);
fn bytes_encode((global_id, uuid, update_id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
let mut bytes = Vec::with_capacity(size_of::<Self::EItem>());
bytes.extend_from_slice(&global_id.to_be_bytes());
bytes.extend_from_slice(uuid.as_bytes());
bytes.extend_from_slice(&update_id.to_be_bytes());
Some(Cow::Owned(bytes))
}
}
impl<'a> BytesDecode<'a> for PendingKeyCodec {
type DItem = (u64, Uuid, u64);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let global_id_bytes = bytes.get(0..size_of::<u64>())?.try_into().ok()?;
let global_id = u64::from_be_bytes(global_id_bytes);
let uuid_bytes = bytes
.get(size_of::<u64>()..(size_of::<u64>() + size_of::<Uuid>()))?
.try_into()
.ok()?;
let uuid = Uuid::from_bytes(uuid_bytes);
let update_id_bytes = bytes
.get((size_of::<u64>() + size_of::<Uuid>())..)?
.try_into()
.ok()?;
let update_id = u64::from_be_bytes(update_id_bytes);
Some((global_id, uuid, update_id))
}
}
pub struct UpdateKeyCodec;
impl<'a> BytesEncode<'a> for UpdateKeyCodec {
type EItem = (Uuid, u64);
fn bytes_encode((uuid, update_id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
let mut bytes = Vec::with_capacity(size_of::<Self::EItem>());
bytes.extend_from_slice(uuid.as_bytes());
bytes.extend_from_slice(&update_id.to_be_bytes());
Some(Cow::Owned(bytes))
}
}
impl<'a> BytesDecode<'a> for UpdateKeyCodec {
type DItem = (Uuid, u64);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let uuid_bytes = bytes.get(0..size_of::<Uuid>())?.try_into().ok()?;
let uuid = Uuid::from_bytes(uuid_bytes);
let update_id_bytes = bytes.get(size_of::<Uuid>()..)?.try_into().ok()?;
let update_id = u64::from_be_bytes(update_id_bytes);
Some((uuid, update_id))
}
}

View File

@ -0,0 +1,189 @@
use std::{
collections::HashSet,
fs::{create_dir_all, File},
io::{BufRead, BufReader, Write},
path::{Path, PathBuf},
};
use heed::{EnvOpenOptions, RoTxn};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::UpdateStore;
use super::{codec::UpdateKeyCodec, State};
use crate::index_controller::{
index_actor::IndexActorHandle, update_actor::store::update_uuid_to_file_path, Enqueued,
UpdateStatus,
};
#[derive(Serialize, Deserialize)]
struct UpdateEntry {
uuid: Uuid,
update: UpdateStatus,
}
impl UpdateStore {
pub fn dump(
&self,
uuids: &HashSet<Uuid>,
path: PathBuf,
handle: impl IndexActorHandle,
) -> anyhow::Result<()> {
let state_lock = self.state.write();
state_lock.swap(State::Dumping);
// txn must *always* be acquired after state lock, or it will dead lock.
let txn = self.env.write_txn()?;
let dump_path = path.join("updates");
create_dir_all(&dump_path)?;
self.dump_updates(&txn, uuids, &dump_path)?;
let fut = dump_indexes(uuids, handle, &path);
tokio::runtime::Handle::current().block_on(fut)?;
state_lock.swap(State::Idle);
Ok(())
}
fn dump_updates(
&self,
txn: &RoTxn,
uuids: &HashSet<Uuid>,
path: impl AsRef<Path>,
) -> anyhow::Result<()> {
let dump_data_path = path.as_ref().join("data.jsonl");
let mut dump_data_file = File::create(dump_data_path)?;
let update_files_path = path.as_ref().join(super::UPDATE_DIR);
create_dir_all(&update_files_path)?;
self.dump_pending(&txn, uuids, &mut dump_data_file, &path)?;
self.dump_completed(&txn, uuids, &mut dump_data_file)?;
Ok(())
}
fn dump_pending(
&self,
txn: &RoTxn,
uuids: &HashSet<Uuid>,
mut file: &mut File,
dst_path: impl AsRef<Path>,
) -> anyhow::Result<()> {
let pendings = self.pending_queue.iter(txn)?.lazily_decode_data();
for pending in pendings {
let ((_, uuid, _), data) = pending?;
if uuids.contains(&uuid) {
let update = data.decode()?;
if let Some(ref update_uuid) = update.content {
let src = super::update_uuid_to_file_path(&self.path, *update_uuid);
let dst = super::update_uuid_to_file_path(&dst_path, *update_uuid);
std::fs::copy(src, dst)?;
}
let update_json = UpdateEntry {
uuid,
update: update.into(),
};
serde_json::to_writer(&mut file, &update_json)?;
file.write_all(b"\n")?;
}
}
Ok(())
}
fn dump_completed(
&self,
txn: &RoTxn,
uuids: &HashSet<Uuid>,
mut file: &mut File,
) -> anyhow::Result<()> {
let updates = self
.updates
.iter(txn)?
.remap_key_type::<UpdateKeyCodec>()
.lazily_decode_data();
for update in updates {
let ((uuid, _), data) = update?;
if uuids.contains(&uuid) {
let update = data.decode()?;
let update_json = UpdateEntry { uuid, update };
serde_json::to_writer(&mut file, &update_json)?;
file.write_all(b"\n")?;
}
}
Ok(())
}
pub fn load_dump(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
db_size: usize,
) -> anyhow::Result<()> {
let dst_update_path = dst.as_ref().join("updates/");
create_dir_all(&dst_update_path)?;
let mut options = EnvOpenOptions::new();
options.map_size(db_size as usize);
let (store, _) = UpdateStore::new(options, &dst_update_path)?;
let src_update_path = src.as_ref().join("updates");
let update_data = File::open(&src_update_path.join("data.jsonl"))?;
let mut update_data = BufReader::new(update_data);
std::fs::create_dir_all(dst_update_path.join("update_files/"))?;
let mut wtxn = store.env.write_txn()?;
let mut line = String::new();
loop {
match update_data.read_line(&mut line) {
Ok(0) => break,
Ok(_) => {
let UpdateEntry { uuid, update } = serde_json::from_str(&line)?;
store.register_raw_updates(&mut wtxn, &update, uuid)?;
// Copy ascociated update path if it exists
if let UpdateStatus::Enqueued(Enqueued {
content: Some(uuid),
..
}) = update
{
let src = update_uuid_to_file_path(&src_update_path, uuid);
let dst = update_uuid_to_file_path(&dst_update_path, uuid);
std::fs::copy(src, dst)?;
}
}
_ => break,
}
line.clear();
}
wtxn.commit()?;
Ok(())
}
}
async fn dump_indexes(
uuids: &HashSet<Uuid>,
handle: impl IndexActorHandle,
path: impl AsRef<Path>,
) -> anyhow::Result<()> {
for uuid in uuids {
handle.dump(*uuid, path.as_ref().to_owned()).await?;
}
Ok(())
}

View File

@ -1,36 +1,35 @@
use std::collections::{BTreeMap, HashSet};
use std::convert::TryInto;
mod codec;
pub mod dump;
use std::fs::{copy, create_dir_all, remove_file, File};
use std::mem::size_of;
use std::path::Path;
use std::sync::Arc;
use std::{borrow::Cow, path::PathBuf};
use std::{
collections::{BTreeMap, HashSet},
path::PathBuf,
};
use anyhow::Context;
use arc_swap::ArcSwap;
use futures::StreamExt;
use heed::types::{ByteSlice, OwnedType, SerdeJson};
use heed::zerocopy::U64;
use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions};
use heed::{CompactionOption, Database, Env, EnvOpenOptions};
use log::error;
use parking_lot::{Mutex, MutexGuard};
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use uuid::Uuid;
use codec::*;
use super::UpdateMeta;
use crate::{helpers::EnvSizer, index_controller::index_actor::IndexResult};
use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle};
use crate::{helpers::EnvSizer, index_controller::index_actor::IndexResult};
#[allow(clippy::upper_case_acronyms)]
type BEU64 = U64<heed::byteorder::BE>;
struct NextIdCodec;
enum NextIdKey {
Global,
Index(Uuid),
}
const UPDATE_DIR: &str = "update_files";
pub struct UpdateStoreInfo {
/// Size of the update store in bytes.
@ -45,13 +44,13 @@ pub struct StateLock {
data: ArcSwap<State>,
}
struct StateLockGuard<'a> {
pub struct StateLockGuard<'a> {
_lock: MutexGuard<'a, ()>,
state: &'a StateLock,
}
impl StateLockGuard<'_> {
fn swap(&self, state: State) -> Arc<State> {
pub fn swap(&self, state: State) -> Arc<State> {
self.state.data.swap(Arc::new(state))
}
}
@ -63,11 +62,11 @@ impl StateLock {
Self { lock, data }
}
fn read(&self) -> Arc<State> {
pub fn read(&self) -> Arc<State> {
self.data.load().clone()
}
fn write(&self) -> StateLockGuard {
pub fn write(&self) -> StateLockGuard {
let _lock = self.lock.lock();
let state = &self;
StateLockGuard { _lock, state }
@ -82,81 +81,6 @@ pub enum State {
Dumping,
}
impl<'a> BytesEncode<'a> for NextIdCodec {
type EItem = NextIdKey;
fn bytes_encode(item: &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
match item {
NextIdKey::Global => Some(Cow::Borrowed(b"__global__")),
NextIdKey::Index(ref uuid) => Some(Cow::Borrowed(uuid.as_bytes())),
}
}
}
struct PendingKeyCodec;
impl<'a> BytesEncode<'a> for PendingKeyCodec {
type EItem = (u64, Uuid, u64);
fn bytes_encode((global_id, uuid, update_id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
let mut bytes = Vec::with_capacity(size_of::<Self::EItem>());
bytes.extend_from_slice(&global_id.to_be_bytes());
bytes.extend_from_slice(uuid.as_bytes());
bytes.extend_from_slice(&update_id.to_be_bytes());
Some(Cow::Owned(bytes))
}
}
impl<'a> BytesDecode<'a> for PendingKeyCodec {
type DItem = (u64, Uuid, u64);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let global_id_bytes = bytes.get(0..size_of::<u64>())?.try_into().ok()?;
let global_id = u64::from_be_bytes(global_id_bytes);
let uuid_bytes = bytes
.get(size_of::<u64>()..(size_of::<u64>() + size_of::<Uuid>()))?
.try_into()
.ok()?;
let uuid = Uuid::from_bytes(uuid_bytes);
let update_id_bytes = bytes
.get((size_of::<u64>() + size_of::<Uuid>())..)?
.try_into()
.ok()?;
let update_id = u64::from_be_bytes(update_id_bytes);
Some((global_id, uuid, update_id))
}
}
struct UpdateKeyCodec;
impl<'a> BytesEncode<'a> for UpdateKeyCodec {
type EItem = (Uuid, u64);
fn bytes_encode((uuid, update_id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
let mut bytes = Vec::with_capacity(size_of::<Self::EItem>());
bytes.extend_from_slice(uuid.as_bytes());
bytes.extend_from_slice(&update_id.to_be_bytes());
Some(Cow::Owned(bytes))
}
}
impl<'a> BytesDecode<'a> for UpdateKeyCodec {
type DItem = (Uuid, u64);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let uuid_bytes = bytes.get(0..size_of::<Uuid>())?.try_into().ok()?;
let uuid = Uuid::from_bytes(uuid_bytes);
let update_id_bytes = bytes.get(size_of::<Uuid>()..)?.try_into().ok()?;
let update_id = u64::from_be_bytes(update_id_bytes);
Some((uuid, update_id))
}
}
#[derive(Clone)]
pub struct UpdateStore {
pub env: Env,
@ -174,19 +98,20 @@ pub struct UpdateStore {
/// | 16-bytes | 8-bytes |
updates: Database<ByteSlice, SerdeJson<UpdateStatus>>,
/// Indicates the current state of the update store,
state: Arc<StateLock>,
pub state: Arc<StateLock>,
/// Wake up the loop when a new event occurs.
notification_sender: mpsc::Sender<()>,
path: PathBuf,
}
impl UpdateStore {
pub fn create(
fn new(
mut options: EnvOpenOptions,
path: impl AsRef<Path>,
) -> anyhow::Result<(Self, mpsc::Receiver<()>)> {
options.max_dbs(5);
let env = options.open(path)?;
let env = options.open(&path)?;
let pending_queue = env.create_database(Some("pending-queue"))?;
let next_update_id = env.create_database(Some("next-update-id"))?;
let updates = env.create_database(Some("updates"))?;
@ -194,8 +119,6 @@ impl UpdateStore {
let state = Arc::new(StateLock::from_state(State::Idle));
let (notification_sender, notification_receiver) = mpsc::channel(10);
// Send a first notification to trigger the process.
let _ = notification_sender.send(());
Ok((
Self {
@ -205,6 +128,7 @@ impl UpdateStore {
updates,
state,
notification_sender,
path: path.as_ref().to_owned(),
},
notification_receiver,
))
@ -215,9 +139,12 @@ impl UpdateStore {
path: impl AsRef<Path>,
index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static,
) -> anyhow::Result<Arc<Self>> {
let (update_store, mut notification_receiver) = Self::create(options, path)?;
let (update_store, mut notification_receiver) = Self::new(options, path)?;
let update_store = Arc::new(update_store);
// Send a first notification to trigger the process.
let _ = update_store.notification_sender.send(());
// Init update loop to perform any pending updates at launch.
// Since we just launched the update store, and we still own the receiving end of the
// channel, this call is guaranteed to succeed.
@ -296,13 +223,13 @@ impl UpdateStore {
pub fn register_update(
&self,
meta: UpdateMeta,
content: Option<impl AsRef<Path>>,
content: Option<Uuid>,
index_uuid: Uuid,
) -> heed::Result<Enqueued> {
let mut txn = self.env.write_txn()?;
let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?;
let meta = Enqueued::new(meta, update_id, content.map(|p| p.as_ref().to_owned()));
let meta = Enqueued::new(meta, update_id, content);
self.pending_queue
.put(&mut txn, &(global_id, index_uuid, update_id), &meta)?;
@ -320,7 +247,7 @@ impl UpdateStore {
pub fn register_raw_updates(
&self,
wtxn: &mut heed::RwTxn,
update: UpdateStatus,
update: &UpdateStatus,
index_uuid: Uuid,
) -> heed::Result<()> {
match update {
@ -364,13 +291,14 @@ impl UpdateStore {
let processing = pending.processing();
// Acquire the state lock and set the current state to processing.
// txn must *always* be acquired after state lock, or it will dead lock.
let state = self.state.write();
state.swap(State::Processing(index_uuid, processing.clone()));
let file = match content_path {
Some(ref path) => {
let file = File::open(path)
.with_context(|| format!("file at path: {:?}", &content_path))?;
Some(uuid) => {
let path = update_uuid_to_file_path(&self.path, uuid);
let file = File::open(path)?;
Some(file)
}
None => None,
@ -386,7 +314,8 @@ impl UpdateStore {
self.pending_queue
.delete(&mut wtxn, &(global_id, index_uuid, update_id))?;
if let Some(path) = content_path {
if let Some(uuid) = content_path {
let path = update_uuid_to_file_path(&self.path, uuid);
remove_file(&path)?;
}
@ -486,7 +415,7 @@ impl UpdateStore {
pub fn delete_all(&self, index_uuid: Uuid) -> anyhow::Result<()> {
let mut txn = self.env.write_txn()?;
// Contains all the content file paths that we need to be removed if the deletion was successful.
let mut paths_to_remove = Vec::new();
let mut uuids_to_remove = Vec::new();
let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data();
@ -494,8 +423,8 @@ impl UpdateStore {
if uuid == index_uuid {
pendings.del_current()?;
let mut pending = pending.decode()?;
if let Some(path) = pending.content.take() {
paths_to_remove.push(path);
if let Some(update_uuid) = pending.content.take() {
uuids_to_remove.push(update_uuid);
}
}
}
@ -515,7 +444,10 @@ impl UpdateStore {
txn.commit()?;
paths_to_remove.iter().for_each(|path| {
uuids_to_remove
.iter()
.map(|uuid| update_uuid_to_file_path(&self.path, *uuid))
.for_each(|path| {
let _ = remove_file(path);
});
@ -546,7 +478,7 @@ impl UpdateStore {
// create db snapshot
self.env.copy_to_path(&db_path, CompactionOption::Enabled)?;
let update_files_path = update_path.join("update_files");
let update_files_path = update_path.join(UPDATE_DIR);
create_dir_all(&update_files_path)?;
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
@ -554,10 +486,13 @@ impl UpdateStore {
for entry in pendings {
let ((_, uuid, _), pending) = entry?;
if uuids.contains(&uuid) {
if let Some(path) = pending.decode()?.content_path() {
let name = path.file_name().unwrap();
let to = update_files_path.join(name);
copy(path, to)?;
if let Enqueued {
content: Some(uuid),
..
} = pending.decode()?
{
let path = update_uuid_to_file_path(&self.path, uuid);
copy(path, &update_files_path)?;
}
}
}
@ -580,85 +515,17 @@ impl UpdateStore {
Ok(())
}
pub fn dump(
&self,
uuids: &HashSet<(String, Uuid)>,
path: PathBuf,
handle: impl IndexActorHandle,
) -> anyhow::Result<()> {
use std::io::prelude::*;
let state_lock = self.state.write();
state_lock.swap(State::Dumping);
let txn = self.env.write_txn()?;
for (index_uid, index_uuid) in uuids.iter() {
let file = File::create(path.join(index_uid).join("updates.jsonl"))?;
let mut file = std::io::BufWriter::new(file);
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
for entry in pendings {
let ((_, uuid, _), pending) = entry?;
if &uuid == index_uuid {
let mut update: UpdateStatus = pending.decode()?.into();
if let Some(path) = update.content_path_mut() {
*path = path.file_name().expect("update path can't be empty").into();
}
serde_json::to_writer(&mut file, &update)?;
file.write_all(b"\n")?;
}
}
let updates = self.updates.prefix_iter(&txn, index_uuid.as_bytes())?;
for entry in updates {
let (_, update) = entry?;
let mut update = update.clone();
if let Some(path) = update.content_path_mut() {
*path = path.file_name().expect("update path can't be empty").into();
}
serde_json::to_writer(&mut file, &update)?;
file.write_all(b"\n")?;
}
}
let update_files_path = path.join("update_files");
create_dir_all(&update_files_path)?;
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
for entry in pendings {
let ((_, uuid, _), pending) = entry?;
if uuids.iter().any(|(_, id)| id == &uuid) {
if let Some(path) = pending.decode()?.content_path() {
let name = path.file_name().unwrap();
let to = update_files_path.join(name);
copy(path, to)?;
}
}
}
// Perform the dump of each index concurently. Only a third of the capabilities of
// the index actor at a time not to put too much pressure on the index actor
let path = &path;
let mut stream = futures::stream::iter(uuids.iter())
.map(|(uid, uuid)| handle.dump(uid.clone(), *uuid, path.clone()))
.buffer_unordered(CONCURRENT_INDEX_MSG / 3);
Handle::current().block_on(async {
while let Some(res) = stream.next().await {
res?;
}
Ok(())
})
}
pub fn get_info(&self) -> anyhow::Result<UpdateStoreInfo> {
let mut size = self.env.size();
let txn = self.env.read_txn()?;
for entry in self.pending_queue.iter(&txn)? {
let (_, pending) = entry?;
if let Some(path) = pending.content_path() {
if let Enqueued {
content: Some(uuid),
..
} = pending
{
let path = update_uuid_to_file_path(&self.path, uuid);
size += File::open(path)?.metadata()?.len();
}
}
@ -671,6 +538,12 @@ impl UpdateStore {
}
}
fn update_uuid_to_file_path(root: impl AsRef<Path>, uuid: Uuid) -> PathBuf {
root.as_ref()
.join(UPDATE_DIR)
.join(format!("update_{}", uuid))
}
#[cfg(test)]
mod test {
use super::*;
@ -716,9 +589,7 @@ mod test {
let uuid = Uuid::new_v4();
let store_clone = update_store.clone();
tokio::task::spawn_blocking(move || {
store_clone
.register_update(meta, Some("here"), uuid)
.unwrap();
store_clone.register_update(meta, None, uuid).unwrap();
})
.await
.unwrap();

View File

@ -1,8 +1,7 @@
use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateFormat};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::index::{Checked, Settings};
@ -34,11 +33,11 @@ pub struct Enqueued {
pub update_id: u64,
pub meta: UpdateMeta,
pub enqueued_at: DateTime<Utc>,
pub content: Option<PathBuf>,
pub content: Option<Uuid>,
}
impl Enqueued {
pub fn new(meta: UpdateMeta, update_id: u64, content: Option<PathBuf>) -> Self {
pub fn new(meta: UpdateMeta, update_id: u64, content: Option<Uuid>) -> Self {
Self {
enqueued_at: Utc::now(),
meta,
@ -68,14 +67,6 @@ impl Enqueued {
pub fn id(&self) -> u64 {
self.update_id
}
pub fn content_path(&self) -> Option<&Path> {
self.content.as_deref()
}
pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> {
self.content.as_mut()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -91,14 +82,6 @@ impl Processed {
pub fn id(&self) -> u64 {
self.from.id()
}
pub fn content_path(&self) -> Option<&Path> {
self.from.content_path()
}
pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> {
self.from.content_path_mut()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -118,14 +101,6 @@ impl Processing {
self.from.meta()
}
pub fn content_path(&self) -> Option<&Path> {
self.from.content_path()
}
pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> {
self.from.content_path_mut()
}
pub fn process(self, success: UpdateResult) -> Processed {
Processed {
success,
@ -155,14 +130,6 @@ impl Aborted {
pub fn id(&self) -> u64 {
self.from.id()
}
pub fn content_path(&self) -> Option<&Path> {
self.from.content_path()
}
pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> {
self.from.content_path_mut()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -178,14 +145,6 @@ impl Failed {
pub fn id(&self) -> u64 {
self.from.id()
}
pub fn content_path(&self) -> Option<&Path> {
self.from.content_path()
}
pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> {
self.from.content_path_mut()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -215,26 +174,6 @@ impl UpdateStatus {
_ => None,
}
}
pub fn content_path(&self) -> Option<&Path> {
match self {
UpdateStatus::Processing(u) => u.content_path(),
UpdateStatus::Processed(u) => u.content_path(),
UpdateStatus::Aborted(u) => u.content_path(),
UpdateStatus::Failed(u) => u.content_path(),
UpdateStatus::Enqueued(u) => u.content_path(),
}
}
pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> {
match self {
UpdateStatus::Processing(u) => u.content_path_mut(),
UpdateStatus::Processed(u) => u.content_path_mut(),
UpdateStatus::Aborted(u) => u.content_path_mut(),
UpdateStatus::Failed(u) => u.content_path_mut(),
UpdateStatus::Enqueued(u) => u.content_path_mut(),
}
}
}
impl From<Enqueued> for UpdateStatus {

View File

@ -4,7 +4,7 @@ use log::{info, warn};
use tokio::sync::mpsc;
use uuid::Uuid;
use super::{Result, UuidError, UuidResolveMsg, UuidStore};
use super::{Result, UuidResolveMsg, UuidResolverError, UuidStore};
pub struct UuidResolverActor<S> {
inbox: mpsc::Receiver<UuidResolveMsg>,
@ -44,6 +44,9 @@ impl<S: UuidStore> UuidResolverActor<S> {
Some(GetSize { ret }) => {
let _ = ret.send(self.handle_get_size().await);
}
Some(DumpRequest { path, ret }) => {
let _ = ret.send(self.handle_dump(path).await);
}
// all senders have been dropped, need to quit.
None => break,
}
@ -54,7 +57,7 @@ impl<S: UuidStore> UuidResolverActor<S> {
async fn handle_create(&self, uid: String) -> Result<Uuid> {
if !is_index_uid_valid(&uid) {
return Err(UuidError::BadlyFormatted(uid));
return Err(UuidResolverError::BadlyFormatted(uid));
}
self.store.create_uuid(uid, true).await
}
@ -63,14 +66,14 @@ impl<S: UuidStore> UuidResolverActor<S> {
self.store
.get_uuid(uid.clone())
.await?
.ok_or(UuidError::UnexistingIndex(uid))
.ok_or(UuidResolverError::UnexistingIndex(uid))
}
async fn handle_delete(&self, uid: String) -> Result<Uuid> {
self.store
.delete(uid.clone())
.await?
.ok_or(UuidError::UnexistingIndex(uid))
.ok_or(UuidResolverError::UnexistingIndex(uid))
}
async fn handle_list(&self) -> Result<Vec<(String, Uuid)>> {
@ -82,9 +85,13 @@ impl<S: UuidStore> UuidResolverActor<S> {
self.store.snapshot(path).await
}
async fn handle_dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
self.store.dump(path).await
}
async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> {
if !is_index_uid_valid(&uid) {
return Err(UuidError::BadlyFormatted(uid));
return Err(UuidResolverError::BadlyFormatted(uid));
}
self.store.insert(uid, uuid).await?;
Ok(())

View File

@ -85,4 +85,12 @@ impl UuidResolverHandle for UuidResolverHandleImpl {
.await
.expect("Uuid resolver actor has been killed")?)
}
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::DumpRequest { ret, path };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
}

View File

@ -34,4 +34,8 @@ pub enum UuidResolveMsg {
GetSize {
ret: oneshot::Sender<Result<u64>>,
},
DumpRequest {
path: PathBuf,
ret: oneshot::Sender<Result<HashSet<Uuid>>>,
},
}

View File

@ -1,7 +1,7 @@
mod actor;
mod handle_impl;
mod message;
mod store;
pub mod store;
use std::collections::HashSet;
use std::path::PathBuf;
@ -16,12 +16,12 @@ use store::UuidStore;
#[cfg(test)]
use mockall::automock;
pub use store::HeedUuidStore;
pub use handle_impl::UuidResolverHandleImpl;
pub use store::HeedUuidStore;
const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB
pub type Result<T> = std::result::Result<T, UuidError>;
pub type Result<T> = std::result::Result<T, UuidResolverError>;
#[async_trait::async_trait]
#[cfg_attr(test, automock)]
@ -33,20 +33,37 @@ pub trait UuidResolverHandle {
async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>>;
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
async fn get_size(&self) -> Result<u64>;
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
}
#[derive(Debug, Error)]
pub enum UuidError {
pub enum UuidResolverError {
#[error("Name already exist.")]
NameAlreadyExist,
#[error("Index \"{0}\" doesn't exist.")]
UnexistingIndex(String),
#[error("Error performing task: {0}")]
TokioTask(#[from] tokio::task::JoinError),
#[error("Database error: {0}")]
Heed(#[from] heed::Error),
#[error("Uuid error: {0}")]
Uuid(#[from] uuid::Error),
#[error("Badly formatted index uid: {0}")]
BadlyFormatted(String),
#[error("Internal error resolving index uid: {0}")]
Internal(String),
}
macro_rules! internal_error {
($($other:path), *) => {
$(
impl From<$other> for UuidResolverError {
fn from(other: $other) -> Self {
Self::Internal(other.to_string())
}
}
)*
}
}
internal_error!(
heed::Error,
uuid::Error,
std::io::Error,
tokio::task::JoinError,
serde_json::Error
);

View File

@ -1,18 +1,26 @@
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::fs::{create_dir_all, File};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use heed::{
types::{ByteSlice, Str},
CompactionOption, Database, Env, EnvOpenOptions,
};
use heed::types::{ByteSlice, Str};
use heed::{CompactionOption, Database, Env, EnvOpenOptions};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::{Result, UuidError, UUID_STORE_SIZE};
use super::{Result, UuidResolverError, UUID_STORE_SIZE};
use crate::helpers::EnvSizer;
#[derive(Serialize, Deserialize)]
struct DumpEntry {
uuid: Uuid,
uid: String,
}
const UUIDS_DB_PATH: &str = "index_uuids";
#[async_trait::async_trait]
pub trait UuidStore {
pub trait UuidStore: Sized {
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return
// the uuid otherwise.
async fn create_uuid(&self, uid: String, err: bool) -> Result<Uuid>;
@ -22,6 +30,7 @@ pub trait UuidStore {
async fn insert(&self, name: String, uuid: Uuid) -> Result<()>;
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
async fn get_size(&self) -> Result<u64>;
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
}
#[derive(Clone)]
@ -32,7 +41,7 @@ pub struct HeedUuidStore {
impl HeedUuidStore {
pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let path = path.as_ref().join("index_uuids");
let path = path.as_ref().join(UUIDS_DB_PATH);
create_dir_all(&path)?;
let mut options = EnvOpenOptions::new();
options.map_size(UUID_STORE_SIZE); // 1GB
@ -48,7 +57,7 @@ impl HeedUuidStore {
match db.get(&txn, &name)? {
Some(uuid) => {
if err {
Err(UuidError::NameAlreadyExist)
Err(UuidResolverError::NameAlreadyExist)
} else {
let uuid = Uuid::from_slice(uuid)?;
Ok(uuid)
@ -62,7 +71,6 @@ impl HeedUuidStore {
}
}
}
pub fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
let env = self.env.clone();
let db = self.db;
@ -127,7 +135,7 @@ impl HeedUuidStore {
// only perform snapshot if there are indexes
if !entries.is_empty() {
path.push("index_uuids");
path.push(UUIDS_DB_PATH);
create_dir_all(&path).unwrap();
path.push("data.mdb");
env.copy_to_path(path, CompactionOption::Enabled)?;
@ -138,6 +146,61 @@ impl HeedUuidStore {
pub fn get_size(&self) -> Result<u64> {
Ok(self.env.size())
}
pub fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
let dump_path = path.join(UUIDS_DB_PATH);
create_dir_all(&dump_path)?;
let dump_file_path = dump_path.join("data.jsonl");
let mut dump_file = File::create(&dump_file_path)?;
let mut uuids = HashSet::new();
let txn = self.env.read_txn()?;
for entry in self.db.iter(&txn)? {
let (uid, uuid) = entry?;
let uid = uid.to_string();
let uuid = Uuid::from_slice(uuid)?;
let entry = DumpEntry { uuid, uid };
serde_json::to_writer(&mut dump_file, &entry)?;
dump_file.write_all(b"\n").unwrap();
uuids.insert(uuid);
}
Ok(uuids)
}
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
let uuid_resolver_path = dst.as_ref().join(UUIDS_DB_PATH);
std::fs::create_dir_all(&uuid_resolver_path)?;
let src_indexes = src.as_ref().join(UUIDS_DB_PATH).join("data.jsonl");
let indexes = File::open(&src_indexes)?;
let mut indexes = BufReader::new(indexes);
let mut line = String::new();
let db = Self::new(dst)?;
let mut txn = db.env.write_txn()?;
loop {
match indexes.read_line(&mut line) {
Ok(0) => break,
Ok(_) => {
let DumpEntry { uuid, uid } = serde_json::from_str(&line)?;
println!("importing {} {}", uid, uuid);
db.db.put(&mut txn, &uid, uuid.as_bytes())?;
}
Err(e) => return Err(e.into()),
}
line.clear();
}
txn.commit()?;
db.env.prepare_for_closing().wait();
Ok(())
}
}
#[async_trait::async_trait]
@ -175,4 +238,9 @@ impl UuidStore for HeedUuidStore {
async fn get_size(&self) -> Result<u64> {
self.get_size()
}
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.dump(path)).await?
}
}

View File

@ -66,7 +66,7 @@ macro_rules! create_app {
.allowed_headers(vec!["content-type", "x-meili-api-key"])
.allow_any_origin()
.allow_any_method()
.max_age(86_400) // 24h
.max_age(86_400), // 24h
)
.wrap(middleware::Logger::default())
.wrap(middleware::Compress::default())

View File

@ -1,20 +1,17 @@
use actix_web::{post, get, web};
use actix_web::HttpResponse;
use serde::{Serialize, Deserialize};
use actix_web::{get, post, web};
use serde::{Deserialize, Serialize};
use crate::error::ResponseError;
use crate::helpers::Authentication;
use crate::Data;
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(create_dump)
.service(get_dump_status);
cfg.service(create_dump).service(get_dump_status);
}
#[post("/dumps", wrap = "Authentication::Private")]
async fn create_dump(
data: web::Data<Data>,
) -> Result<HttpResponse, ResponseError> {
async fn create_dump(data: web::Data<Data>) -> Result<HttpResponse, ResponseError> {
let res = data.create_dump().await?;
Ok(HttpResponse::Accepted().json(res))

View File

@ -1,7 +1,7 @@
use actix_web::{delete, get, post, put};
use actix_web::{web, HttpResponse};
use chrono::{DateTime, Utc};
use serde::{Serialize, Deserialize};
use serde::{Deserialize, Serialize};
use crate::error::ResponseError;
use crate::helpers::Authentication;

View File

@ -2,6 +2,7 @@ use actix_web::{get, HttpResponse};
use serde::{Deserialize, Serialize};
pub mod document;
pub mod dump;
pub mod health;
pub mod index;
pub mod key;
@ -9,7 +10,6 @@ pub mod search;
pub mod settings;
pub mod stats;
pub mod synonym;
pub mod dump;
#[derive(Deserialize)]
pub struct IndexParam {

View File

@ -1,9 +1,9 @@
use actix_web::{delete, get, post, web, HttpResponse};
use crate::{error::ResponseError, index::Unchecked};
use crate::helpers::Authentication;
use crate::index::Settings;
use crate::Data;
use crate::{error::ResponseError, index::Unchecked};
#[macro_export]
macro_rules! make_setting_route {