158: Implements the dumps r=irevoire a=irevoire

closes #20

divergence from legacy meilisearch:
- dump v2 added, support loading of pending updates (only works dumps created from v2)
- added time stamps to the dump info
- Dump info are only persisted in an internal data structure, and they are not fetched from fs on demand anymore. This was a potential security flaw. This means that the dump infos are flushed on every restart.

Co-authored-by: tamo <tamo@meilisearch.com>
Co-authored-by: Marin Postma <postma.marin@protonmail.com>
This commit is contained in:
bors[bot] 2021-06-02 12:06:47 +00:00 committed by GitHub
commit 509a56a43d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
48 changed files with 1954 additions and 990 deletions

6
Cargo.lock generated
View File

@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "actix-codec"
version = "0.4.0"
@ -1840,8 +1842,8 @@ dependencies = [
[[package]]
name = "milli"
version = "0.2.0"
source = "git+https://github.com/meilisearch/milli.git?tag=v0.2.0#792225eaffce6b3682f9b30b7370b6a547c4757e"
version = "0.2.1"
source = "git+https://github.com/meilisearch/milli.git?tag=v0.2.1#25f75d4d03732131e6edcf20f4d126210b159d43"
dependencies = [
"anyhow",
"bstr",

View File

@ -51,7 +51,7 @@ main_error = "0.1.0"
meilisearch-error = { path = "../meilisearch-error" }
meilisearch-tokenizer = { git = "https://github.com/meilisearch/Tokenizer.git", tag = "v0.2.2" }
memmap = "0.7.0"
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.2.0" }
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.2.1" }
mime = "0.3.16"
once_cell = "1.5.2"
oxidized-json-checker = "0.3.2"

View File

@ -50,7 +50,7 @@ mod mini_dashboard {
sha1_file.read_to_string(&mut sha1)?;
if sha1 == meta["sha1"].as_str().unwrap() {
// Nothing to do.
return Ok(())
return Ok(());
}
}
@ -62,7 +62,11 @@ mod mini_dashboard {
hasher.update(&dashboard_assets_bytes);
let sha1 = hex::encode(hasher.finalize());
assert_eq!(meta["sha1"].as_str().unwrap(), sha1, "Downloaded mini-dashboard shasum differs from the one specified in the Cargo.toml");
assert_eq!(
meta["sha1"].as_str().unwrap(),
sha1,
"Downloaded mini-dashboard shasum differs from the one specified in the Cargo.toml"
);
create_dir_all(&dashboard_dir)?;
let cursor = Cursor::new(&dashboard_assets_bytes);

View File

@ -4,8 +4,9 @@ use std::sync::Arc;
use sha2::Digest;
use crate::index::{Checked, Settings};
use crate::index_controller::{IndexController, IndexStats, Stats};
use crate::index_controller::{IndexMetadata, IndexSettings};
use crate::index_controller::{
DumpInfo, IndexController, IndexMetadata, IndexSettings, IndexStats, Stats,
};
use crate::option::Opt;
pub mod search;
@ -68,7 +69,11 @@ impl Data {
api_keys.generate_missing_api_keys();
let inner = DataInner { index_controller, api_keys, options };
let inner = DataInner {
index_controller,
api_keys,
options,
};
let inner = Arc::new(inner);
Ok(Data { inner })
@ -108,6 +113,14 @@ impl Data {
Ok(self.index_controller.get_all_stats().await?)
}
pub async fn create_dump(&self) -> anyhow::Result<DumpInfo> {
Ok(self.index_controller.create_dump().await?)
}
pub async fn dump_status(&self, uid: String) -> anyhow::Result<DumpInfo> {
Ok(self.index_controller.dump_info(uid).await?)
}
#[inline]
pub fn http_payload_size_limit(&self) -> usize {
self.options.http_payload_size_limit.get_bytes() as usize

View File

@ -1,423 +0,0 @@
use std::fs::{create_dir_all, File};
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::thread;
use actix_web::web;
use chrono::offset::Utc;
use indexmap::IndexMap;
use log::{error, info};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tempfile::TempDir;
use crate::Data;
use crate::error::{Error, ResponseError};
use crate::helpers::compression;
use crate::routes::index;
use crate::routes::setting::Settings;
use crate::routes::index::IndexResponse;
// Mutex to share dump progress.
static DUMP_INFO: Lazy<Mutex<Option<DumpInfo>>> = Lazy::new(Mutex::default);
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
enum DumpVersion {
V1,
}
impl DumpVersion {
const CURRENT: Self = Self::V1;
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DumpMetadata {
indexes: Vec<crate::routes::index::IndexResponse>,
db_version: String,
dump_version: DumpVersion,
}
impl DumpMetadata {
/// Create a DumpMetadata with the current dump version of meilisearch.
pub fn new(indexes: Vec<crate::routes::index::IndexResponse>, db_version: String) -> Self {
DumpMetadata {
indexes,
db_version,
dump_version: DumpVersion::CURRENT,
}
}
/// Extract DumpMetadata from `metadata.json` file present at provided `dir_path`
fn from_path(dir_path: &Path) -> Result<Self, Error> {
let path = dir_path.join("metadata.json");
let file = File::open(path)?;
let reader = std::io::BufReader::new(file);
let metadata = serde_json::from_reader(reader)?;
Ok(metadata)
}
/// Write DumpMetadata in `metadata.json` file at provided `dir_path`
fn to_path(&self, dir_path: &Path) -> Result<(), Error> {
let path = dir_path.join("metadata.json");
let file = File::create(path)?;
serde_json::to_writer(file, &self)?;
Ok(())
}
}
/// Extract Settings from `settings.json` file present at provided `dir_path`
fn settings_from_path(dir_path: &Path) -> Result<Settings, Error> {
let path = dir_path.join("settings.json");
let file = File::open(path)?;
let reader = std::io::BufReader::new(file);
let metadata = serde_json::from_reader(reader)?;
Ok(metadata)
}
/// Write Settings in `settings.json` file at provided `dir_path`
fn settings_to_path(settings: &Settings, dir_path: &Path) -> Result<(), Error> {
let path = dir_path.join("settings.json");
let file = File::create(path)?;
serde_json::to_writer(file, settings)?;
Ok(())
}
/// Import settings and documents of a dump with version `DumpVersion::V1` in specified index.
fn import_index_v1(
data: &Data,
dumps_dir: &Path,
index_uid: &str,
document_batch_size: usize,
write_txn: &mut MainWriter,
) -> Result<(), Error> {
// open index
let index = data
.db
.open_index(index_uid)
.ok_or(Error::index_not_found(index_uid))?;
// index dir path in dump dir
let index_path = &dumps_dir.join(index_uid);
// extract `settings.json` file and import content
let settings = settings_from_path(&index_path)?;
let settings = settings.to_update().map_err(|e| Error::dump_failed(format!("importing settings for index {}; {}", index_uid, e)))?;
apply_settings_update(write_txn, &index, settings)?;
// create iterator over documents in `documents.jsonl` to make batch importation
// create iterator over documents in `documents.jsonl` to make batch importation
let documents = {
let file = File::open(&index_path.join("documents.jsonl"))?;
let reader = std::io::BufReader::new(file);
let deserializer = serde_json::Deserializer::from_reader(reader);
deserializer.into_iter::<IndexMap<String, serde_json::Value>>()
};
// batch import document every `document_batch_size`:
// create a Vec to bufferize documents
let mut values = Vec::with_capacity(document_batch_size);
// iterate over documents
for document in documents {
// push document in buffer
values.push(document?);
// if buffer is full, create and apply a batch, and clean buffer
if values.len() == document_batch_size {
let batch = std::mem::replace(&mut values, Vec::with_capacity(document_batch_size));
apply_documents_addition(write_txn, &index, batch)?;
}
}
// apply documents remaining in the buffer
if !values.is_empty() {
apply_documents_addition(write_txn, &index, values)?;
}
// sync index information: stats, updated_at, last_update
if let Err(e) = crate::index_update_callback_txn(index, index_uid, data, write_txn) {
return Err(Error::Internal(e));
}
Ok(())
}
/// Import dump from `dump_path` in database.
pub fn import_dump(
data: &Data,
dump_path: &Path,
document_batch_size: usize,
) -> Result<(), Error> {
info!("Importing dump from {:?}...", dump_path);
// create a temporary directory
let tmp_dir = TempDir::new()?;
let tmp_dir_path = tmp_dir.path();
// extract dump in temporary directory
compression::from_tar_gz(dump_path, tmp_dir_path)?;
// read dump metadata
let metadata = DumpMetadata::from_path(&tmp_dir_path)?;
// choose importation function from DumpVersion of metadata
let import_index = match metadata.dump_version {
DumpVersion::V1 => import_index_v1,
};
// remove indexes which have same `uid` than indexes to import and create empty indexes
let existing_index_uids = data.db.indexes_uids();
for index in metadata.indexes.iter() {
if existing_index_uids.contains(&index.uid) {
data.db.delete_index(index.uid.clone())?;
}
index::create_index_sync(&data.db, index.uid.clone(), index.name.clone(), index.primary_key.clone())?;
}
// import each indexes content
data.db.main_write::<_, _, Error>(|mut writer| {
for index in metadata.indexes {
import_index(&data, tmp_dir_path, &index.uid, document_batch_size, &mut writer)?;
}
Ok(())
})?;
info!("Dump importation from {:?} succeed", dump_path);
Ok(())
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
#[serde(rename_all = "snake_case")]
pub enum DumpStatus {
Done,
InProgress,
Failed,
}
#[derive(Debug, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct DumpInfo {
pub uid: String,
pub status: DumpStatus,
#[serde(skip_serializing_if = "Option::is_none", flatten)]
pub error: Option<serde_json::Value>,
}
impl DumpInfo {
pub fn new(uid: String, status: DumpStatus) -> Self {
Self { uid, status, error: None }
}
pub fn with_error(mut self, error: ResponseError) -> Self {
self.status = DumpStatus::Failed;
self.error = Some(json!(error));
self
}
pub fn dump_already_in_progress(&self) -> bool {
self.status == DumpStatus::InProgress
}
pub fn get_current() -> Option<Self> {
DUMP_INFO.lock().unwrap().clone()
}
pub fn set_current(&self) {
*DUMP_INFO.lock().unwrap() = Some(self.clone());
}
}
/// Generate uid from creation date
fn generate_uid() -> String {
Utc::now().format("%Y%m%d-%H%M%S%3f").to_string()
}
/// Infer dumps_dir from dump_uid
pub fn compressed_dumps_dir(dumps_dir: &Path, dump_uid: &str) -> PathBuf {
dumps_dir.join(format!("{}.dump", dump_uid))
}
/// Write metadata in dump
fn dump_metadata(data: &web::Data<Data>, dir_path: &Path, indexes: Vec<IndexResponse>) -> Result<(), Error> {
let (db_major, db_minor, db_patch) = data.db.version();
let metadata = DumpMetadata::new(indexes, format!("{}.{}.{}", db_major, db_minor, db_patch));
metadata.to_path(dir_path)
}
/// Export settings of provided index in dump
fn dump_index_settings(data: &web::Data<Data>, reader: &MainReader, dir_path: &Path, index_uid: &str) -> Result<(), Error> {
let settings = crate::routes::setting::get_all_sync(data, reader, index_uid)?;
settings_to_path(&settings, dir_path)
}
/// Export updates of provided index in dump
fn dump_index_updates(data: &web::Data<Data>, reader: &UpdateReader, dir_path: &Path, index_uid: &str) -> Result<(), Error> {
let updates_path = dir_path.join("updates.jsonl");
let updates = crate::routes::index::get_all_updates_status_sync(data, reader, index_uid)?;
let file = File::create(updates_path)?;
for update in updates {
serde_json::to_writer(&file, &update)?;
writeln!(&file)?;
}
Ok(())
}
/// Export documents of provided index in dump
fn dump_index_documents(data: &web::Data<Data>, reader: &MainReader, dir_path: &Path, index_uid: &str) -> Result<(), Error> {
let documents_path = dir_path.join("documents.jsonl");
let file = File::create(documents_path)?;
let dump_batch_size = data.dump_batch_size;
let mut offset = 0;
loop {
let documents = crate::routes::document::get_all_documents_sync(data, reader, index_uid, offset, dump_batch_size, None)?;
if documents.is_empty() { break; } else { offset += dump_batch_size; }
for document in documents {
serde_json::to_writer(&file, &document)?;
writeln!(&file)?;
}
}
Ok(())
}
/// Write error with a context.
fn fail_dump_process<E: std::error::Error>(dump_info: DumpInfo, context: &str, error: E) {
let error_message = format!("{}; {}", context, error);
error!("Something went wrong during dump process: {}", &error_message);
dump_info.with_error(Error::dump_failed(error_message).into()).set_current();
}
/// Main function of dump.
fn dump_process(data: web::Data<Data>, dumps_dir: PathBuf, dump_info: DumpInfo) {
// open read transaction on Update
let update_reader = match data.db.update_read_txn() {
Ok(r) => r,
Err(e) => {
fail_dump_process(dump_info, "creating RO transaction on updates", e);
return ;
}
};
// open read transaction on Main
let main_reader = match data.db.main_read_txn() {
Ok(r) => r,
Err(e) => {
fail_dump_process(dump_info, "creating RO transaction on main", e);
return ;
}
};
// create a temporary directory
let tmp_dir = match TempDir::new() {
Ok(tmp_dir) => tmp_dir,
Err(e) => {
fail_dump_process(dump_info, "creating temporary directory", e);
return ;
}
};
let tmp_dir_path = tmp_dir.path();
// fetch indexes
let indexes = match crate::routes::index::list_indexes_sync(&data, &main_reader) {
Ok(indexes) => indexes,
Err(e) => {
fail_dump_process(dump_info, "listing indexes", e);
return ;
}
};
// create metadata
if let Err(e) = dump_metadata(&data, &tmp_dir_path, indexes.clone()) {
fail_dump_process(dump_info, "generating metadata", e);
return ;
}
// export settings, updates and documents for each indexes
for index in indexes {
let index_path = tmp_dir_path.join(&index.uid);
// create index sub-dircetory
if let Err(e) = create_dir_all(&index_path) {
fail_dump_process(dump_info, &format!("creating directory for index {}", &index.uid), e);
return ;
}
// export settings
if let Err(e) = dump_index_settings(&data, &main_reader, &index_path, &index.uid) {
fail_dump_process(dump_info, &format!("generating settings for index {}", &index.uid), e);
return ;
}
// export documents
if let Err(e) = dump_index_documents(&data, &main_reader, &index_path, &index.uid) {
fail_dump_process(dump_info, &format!("generating documents for index {}", &index.uid), e);
return ;
}
// export updates
if let Err(e) = dump_index_updates(&data, &update_reader, &index_path, &index.uid) {
fail_dump_process(dump_info, &format!("generating updates for index {}", &index.uid), e);
return ;
}
}
// compress dump in a file named `{dump_uid}.dump` in `dumps_dir`
if let Err(e) = crate::helpers::compression::to_tar_gz(&tmp_dir_path, &compressed_dumps_dir(&dumps_dir, &dump_info.uid)) {
fail_dump_process(dump_info, "compressing dump", e);
return ;
}
// update dump info to `done`
let resume = DumpInfo::new(
dump_info.uid,
DumpStatus::Done
);
resume.set_current();
}
pub fn init_dump_process(data: &web::Data<Data>, dumps_dir: &Path) -> Result<DumpInfo, Error> {
create_dir_all(dumps_dir).map_err(|e| Error::dump_failed(format!("creating temporary directory {}", e)))?;
// check if a dump is already in progress
if let Some(resume) = DumpInfo::get_current() {
if resume.dump_already_in_progress() {
return Err(Error::dump_conflict())
}
}
// generate a new dump info
let info = DumpInfo::new(
generate_uid(),
DumpStatus::InProgress
);
info.set_current();
let data = data.clone();
let dumps_dir = dumps_dir.to_path_buf();
let info_cloned = info.clone();
// run dump process in a new thread
thread::spawn(move ||
dump_process(data, dumps_dir, info_cloned)
);
Ok(info)
}

View File

@ -299,7 +299,7 @@ impl From<JsonPayloadError> for Error {
JsonPayloadError::Payload(err) => {
Error::BadRequest(format!("Problem while decoding the request: {}", err))
}
e => Error::Internal(format!("Unexpected Json error: {}", e))
e => Error::Internal(format!("Unexpected Json error: {}", e)),
}
}
}
@ -310,7 +310,7 @@ impl From<QueryPayloadError> for Error {
QueryPayloadError::Deserialize(err) => {
Error::BadRequest(format!("Invalid query parameters: {}", err))
}
e => Error::Internal(format!("Unexpected query payload error: {}", e))
e => Error::Internal(format!("Unexpected query payload error: {}", e)),
}
}
}

View File

@ -1,16 +1,16 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_web::body::Body;
use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
use actix_web::web;
use actix_web::body::Body;
use futures::ready;
use futures::future::{ok, Future, Ready};
use actix_web::ResponseError as _;
use futures::future::{ok, Future, Ready};
use futures::ready;
use pin_project::pin_project;
use crate::Data;
use crate::error::{Error, ResponseError};
use crate::Data;
#[derive(Clone, Copy)]
pub enum Authentication {
@ -59,19 +59,15 @@ where
let data = req.app_data::<web::Data<Data>>().unwrap();
if data.api_keys().master.is_none() {
return AuthenticationFuture::Authenticated(self.service.call(req))
return AuthenticationFuture::Authenticated(self.service.call(req));
}
let auth_header = match req.headers().get("X-Meili-API-Key") {
Some(auth) => match auth.to_str() {
Ok(auth) => auth,
Err(_) => {
return AuthenticationFuture::NoHeader(Some(req))
}
Err(_) => return AuthenticationFuture::NoHeader(Some(req)),
},
None => {
return AuthenticationFuture::NoHeader(Some(req))
}
None => return AuthenticationFuture::NoHeader(Some(req)),
};
let authenticated = match self.acl {
@ -111,15 +107,13 @@ where
{
type Output = Result<ServiceResponse<Body>, actix_web::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) ->Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this {
AuthProj::Authenticated(fut) => {
match ready!(fut.poll(cx)) {
Ok(resp) => Poll::Ready(Ok(resp)),
Err(e) => Poll::Ready(Err(e)),
}
}
AuthProj::Authenticated(fut) => match ready!(fut.poll(cx)) {
Ok(resp) => Poll::Ready(Ok(resp)),
Err(e) => Poll::Ready(Err(e)),
},
AuthProj::NoHeader(req) => {
match req.take() {
Some(req) => {
@ -135,7 +129,8 @@ where
AuthProj::Refused(req) => {
match req.take() {
Some(req) => {
let bad_token = req.headers()
let bad_token = req
.headers()
.get("X-Meili-API-Key")
.map(|h| h.to_str().map(String::from).unwrap_or_default())
.unwrap_or_default();

View File

@ -0,0 +1,132 @@
use std::fs::{create_dir_all, File};
use std::io::{BufRead, BufReader, Write};
use std::path::Path;
use std::sync::Arc;
use anyhow::{bail, Context};
use heed::RoTxn;
use indexmap::IndexMap;
use milli::update::{IndexDocumentsMethod, UpdateFormat::JsonStream};
use serde::{Deserialize, Serialize};
use crate::option::IndexerOpts;
use super::{update_handler::UpdateHandler, Index, Settings, Unchecked};
#[derive(Serialize, Deserialize)]
struct DumpMeta {
settings: Settings<Unchecked>,
primary_key: Option<String>,
}
const META_FILE_NAME: &str = "meta.json";
const DATA_FILE_NAME: &str = "documents.jsonl";
impl Index {
pub fn dump(&self, path: impl AsRef<Path>) -> anyhow::Result<()> {
// acquire write txn make sure any ongoing write is finished before we start.
let txn = self.env.write_txn()?;
self.dump_documents(&txn, &path)?;
self.dump_meta(&txn, &path)?;
Ok(())
}
fn dump_documents(&self, txn: &RoTxn, path: impl AsRef<Path>) -> anyhow::Result<()> {
let document_file_path = path.as_ref().join(DATA_FILE_NAME);
let mut document_file = File::create(&document_file_path)?;
let documents = self.all_documents(txn)?;
let fields_ids_map = self.fields_ids_map(txn)?;
// dump documents
let mut json_map = IndexMap::new();
for document in documents {
let (_, reader) = document?;
for (fid, bytes) in reader.iter() {
if let Some(name) = fields_ids_map.name(fid) {
json_map.insert(name, serde_json::from_slice::<serde_json::Value>(bytes)?);
}
}
serde_json::to_writer(&mut document_file, &json_map)?;
document_file.write_all(b"\n")?;
json_map.clear();
}
Ok(())
}
fn dump_meta(&self, txn: &RoTxn, path: impl AsRef<Path>) -> anyhow::Result<()> {
let meta_file_path = path.as_ref().join(META_FILE_NAME);
let mut meta_file = File::create(&meta_file_path)?;
let settings = self.settings_txn(txn)?.into_unchecked();
let primary_key = self.primary_key(txn)?.map(String::from);
let meta = DumpMeta {
settings,
primary_key,
};
serde_json::to_writer(&mut meta_file, &meta)?;
Ok(())
}
pub fn load_dump(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
size: usize,
indexing_options: &IndexerOpts,
) -> anyhow::Result<()> {
let dir_name = src
.as_ref()
.file_name()
.with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?;
let dst_dir_path = dst.as_ref().join("indexes").join(dir_name);
create_dir_all(&dst_dir_path)?;
let meta_path = src.as_ref().join(META_FILE_NAME);
let mut meta_file = File::open(meta_path)?;
let DumpMeta {
settings,
primary_key,
} = serde_json::from_reader(&mut meta_file)?;
let settings = settings.check();
let index = Self::open(&dst_dir_path, size)?;
let mut txn = index.write_txn()?;
let handler = UpdateHandler::new(&indexing_options)?;
index.update_settings_txn(&mut txn, &settings, handler.update_builder(0))?;
let document_file_path = src.as_ref().join(DATA_FILE_NAME);
let reader = File::open(&document_file_path)?;
let mut reader = BufReader::new(reader);
reader.fill_buf()?;
// If the document file is empty, we don't perform the document addition, to prevent
// a primary key error to be thrown.
if !reader.buffer().is_empty() {
index.update_documents_txn(
&mut txn,
JsonStream,
IndexDocumentsMethod::UpdateDocuments,
Some(reader),
handler.update_builder(0),
primary_key.as_deref(),
)?;
}
txn.commit()?;
match Arc::try_unwrap(index.0) {
Ok(inner) => inner.prepare_for_closing().wait(),
Err(_) => bail!("Could not close index properly."),
}
Ok(())
}
}

View File

@ -1,16 +1,23 @@
use std::{collections::{BTreeSet, HashSet}, marker::PhantomData};
use std::collections::{BTreeSet, HashSet};
use std::fs::create_dir_all;
use std::marker::PhantomData;
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use anyhow::{bail, Context};
use heed::{EnvOpenOptions, RoTxn};
use milli::obkv_to_json;
use serde_json::{Map, Value};
use crate::helpers::EnvSizer;
pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT};
pub use updates::{Facets, Settings, Checked, Unchecked};
use serde::{de::Deserializer, Deserialize};
pub use updates::{Checked, Facets, Settings, Unchecked};
mod dump;
mod search;
pub mod update_handler;
mod updates;
pub type Document = Map<String, Value>;
@ -26,19 +33,36 @@ impl Deref for Index {
}
}
pub fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
where
T: Deserialize<'de>,
D: Deserializer<'de>,
{
Deserialize::deserialize(deserializer).map(Some)
}
impl Index {
pub fn open(path: impl AsRef<Path>, size: usize) -> anyhow::Result<Self> {
create_dir_all(&path)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, &path)?;
Ok(Index(Arc::new(index)))
}
pub fn settings(&self) -> anyhow::Result<Settings<Checked>> {
let txn = self.read_txn()?;
self.settings_txn(&txn)
}
pub fn settings_txn(&self, txn: &RoTxn) -> anyhow::Result<Settings<Checked>> {
let displayed_attributes = self
.displayed_fields(&txn)?
.map(|fields| fields.into_iter().map(String::from).collect())
.unwrap_or_else(|| vec!["*".to_string()]);
.map(|fields| fields.into_iter().map(String::from).collect());
let searchable_attributes = self
.searchable_fields(&txn)?
.map(|fields| fields.into_iter().map(String::from).collect())
.unwrap_or_else(|| vec!["*".to_string()]);
.map(|fields| fields.into_iter().map(String::from).collect());
let faceted_attributes = self
.faceted_fields(&txn)?
@ -62,8 +86,8 @@ impl Index {
let distinct_attribute = self.distinct_attribute(&txn)?.map(String::from);
Ok(Settings {
displayed_attributes: Some(Some(displayed_attributes)),
searchable_attributes: Some(Some(searchable_attributes)),
displayed_attributes: Some(displayed_attributes),
searchable_attributes: Some(searchable_attributes),
attributes_for_faceting: Some(Some(faceted_attributes)),
ranking_rules: Some(Some(criteria)),
stop_words: Some(Some(stop_words)),

View File

@ -90,7 +90,8 @@ impl Index {
let mut documents = Vec::new();
let fields_ids_map = self.fields_ids_map(&rtxn).unwrap();
let displayed_ids = self.displayed_fields_ids(&rtxn)?
let displayed_ids = self
.displayed_fields_ids(&rtxn)?
.map(|fields| fields.into_iter().collect::<HashSet<_>>())
.unwrap_or_else(|| fields_ids_map.iter().map(|(id, _)| id).collect());
@ -156,10 +157,8 @@ impl Index {
};
let stop_words = fst::Set::default();
let highlighter = Highlighter::new(
&stop_words,
(String::from("<em>"), String::from("</em>")),
);
let highlighter =
Highlighter::new(&stop_words, (String::from("<em>"), String::from("</em>")));
for (_id, obkv) in self.documents(&rtxn, documents_ids)? {
let document = make_document(&all_attributes, &fields_ids_map, obkv)?;
@ -384,17 +383,16 @@ mod test {
#[test]
fn no_formatted() {
let stop_words = fst::Set::default();
let highlighter = Highlighter::new(
&stop_words,
(String::from("<em>"), String::from("</em>")),
);
let highlighter =
Highlighter::new(&stop_words, (String::from("<em>"), String::from("</em>")));
let mut fields = FieldsIdsMap::new();
let id = fields.insert("test").unwrap();
let mut buf = Vec::new();
let mut obkv = obkv::KvWriter::new(&mut buf);
obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()).unwrap();
obkv.insert(id, Value::String("hello".into()).to_string().as_bytes())
.unwrap();
obkv.finish().unwrap();
let obkv = obkv::KvReader::new(&buf);
@ -410,8 +408,9 @@ mod test {
&highlighter,
&matching_words,
&all_formatted,
&to_highlight_ids
).unwrap();
&to_highlight_ids,
)
.unwrap();
assert!(value.is_empty());
}
@ -419,17 +418,16 @@ mod test {
#[test]
fn formatted_no_highlight() {
let stop_words = fst::Set::default();
let highlighter = Highlighter::new(
&stop_words,
(String::from("<em>"), String::from("</em>")),
);
let highlighter =
Highlighter::new(&stop_words, (String::from("<em>"), String::from("</em>")));
let mut fields = FieldsIdsMap::new();
let id = fields.insert("test").unwrap();
let mut buf = Vec::new();
let mut obkv = obkv::KvWriter::new(&mut buf);
obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()).unwrap();
obkv.insert(id, Value::String("hello".into()).to_string().as_bytes())
.unwrap();
obkv.finish().unwrap();
let obkv = obkv::KvReader::new(&buf);
@ -445,8 +443,9 @@ mod test {
&highlighter,
&matching_words,
&all_formatted,
&to_highlight_ids
).unwrap();
&to_highlight_ids,
)
.unwrap();
assert_eq!(value["test"], "hello");
}
@ -454,17 +453,16 @@ mod test {
#[test]
fn formatted_with_highlight() {
let stop_words = fst::Set::default();
let highlighter = Highlighter::new(
&stop_words,
(String::from("<em>"), String::from("</em>")),
);
let highlighter =
Highlighter::new(&stop_words, (String::from("<em>"), String::from("</em>")));
let mut fields = FieldsIdsMap::new();
let id = fields.insert("test").unwrap();
let mut buf = Vec::new();
let mut obkv = obkv::KvWriter::new(&mut buf);
obkv.insert(id, Value::String("hello".into()).to_string().as_bytes()).unwrap();
obkv.insert(id, Value::String("hello".into()).to_string().as_bytes())
.unwrap();
obkv.finish().unwrap();
let obkv = obkv::KvReader::new(&buf);
@ -480,8 +478,9 @@ mod test {
&highlighter,
&matching_words,
&all_formatted,
&to_highlight_ids
).unwrap();
&to_highlight_ids,
)
.unwrap();
assert_eq!(value["test"], "<em>hello</em>");
}

View File

@ -38,7 +38,7 @@ impl UpdateHandler {
})
}
fn update_builder(&self, update_id: u64) -> UpdateBuilder {
pub fn update_builder(&self, update_id: u64) -> UpdateBuilder {
// We prepare the update by using the update builder.
let mut update_builder = UpdateBuilder::new(update_id);
if let Some(max_nb_chunks) = self.max_nb_chunks {
@ -82,7 +82,7 @@ impl UpdateHandler {
),
ClearDocuments => index.clear_documents(update_builder),
DeleteDocuments => index.delete_documents(content, update_builder),
Settings(settings) => index.update_settings(settings, update_builder),
Settings(settings) => index.update_settings(&settings.clone().check(), update_builder),
};
match result {

View File

@ -1,28 +1,39 @@
use std::collections::{BTreeSet, HashMap};
use std::io;
use std::num::NonZeroUsize;
use std::marker::PhantomData;
use std::num::NonZeroUsize;
use flate2::read::GzDecoder;
use log::info;
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
use serde::{de::Deserializer, Deserialize, Serialize};
use serde::{Deserialize, Serialize, Serializer};
use super::Index;
use crate::index_controller::UpdateResult;
#[derive(Clone, Default, Debug)]
use super::{deserialize_some, Index};
fn serialize_with_wildcard<S>(field: &Option<Option<Vec<String>>>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let wildcard = vec!["*".to_string()];
s.serialize_some(&field.as_ref().map(|o| o.as_ref().unwrap_or(&wildcard)))
}
#[derive(Clone, Default, Debug, Serialize)]
pub struct Checked;
#[derive(Clone, Default, Debug)]
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct Unchecked;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
#[serde(bound(serialize = "T: Serialize", deserialize = "T: Deserialize<'static>"))]
pub struct Settings<T> {
#[serde(
default,
deserialize_with = "deserialize_some",
serialize_with = "serialize_with_wildcard",
skip_serializing_if = "Option::is_none"
)]
pub displayed_attributes: Option<Option<Vec<String>>>,
@ -30,11 +41,16 @@ pub struct Settings<T> {
#[serde(
default,
deserialize_with = "deserialize_some",
serialize_with = "serialize_with_wildcard",
skip_serializing_if = "Option::is_none"
)]
pub searchable_attributes: Option<Option<Vec<String>>>,
#[serde(default)]
#[serde(
default,
deserialize_with = "deserialize_some",
skip_serializing_if = "Option::is_none"
)]
pub attributes_for_faceting: Option<Option<HashMap<String, String>>>,
#[serde(
@ -72,6 +88,28 @@ impl Settings<Checked> {
_kind: PhantomData,
}
}
pub fn into_unchecked(self) -> Settings<Unchecked> {
let Self {
displayed_attributes,
searchable_attributes,
attributes_for_faceting,
ranking_rules,
stop_words,
distinct_attribute,
..
} = self;
Settings {
displayed_attributes,
searchable_attributes,
attributes_for_faceting,
ranking_rules,
stop_words,
distinct_attribute,
_kind: PhantomData,
}
}
}
impl Settings<Unchecked> {
@ -118,14 +156,6 @@ pub struct Facets {
pub min_level_size: Option<NonZeroUsize>,
}
fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
where
T: Deserialize<'de>,
D: Deserializer<'de>,
{
Deserialize::deserialize(deserializer).map(Some)
}
impl Index {
pub fn update_documents(
&self,
@ -134,17 +164,37 @@ impl Index {
content: Option<impl io::Read>,
update_builder: UpdateBuilder,
primary_key: Option<&str>,
) -> anyhow::Result<UpdateResult> {
let mut txn = self.write_txn()?;
let result = self.update_documents_txn(
&mut txn,
format,
method,
content,
update_builder,
primary_key,
)?;
txn.commit()?;
Ok(result)
}
pub fn update_documents_txn<'a, 'b>(
&'a self,
txn: &mut heed::RwTxn<'a, 'b>,
format: UpdateFormat,
method: IndexDocumentsMethod,
content: Option<impl io::Read>,
update_builder: UpdateBuilder,
primary_key: Option<&str>,
) -> anyhow::Result<UpdateResult> {
info!("performing document addition");
// We must use the write transaction of the update here.
let mut wtxn = self.write_txn()?;
// Set the primary key if not set already, ignore if already set.
if let (None, Some(ref primary_key)) = (self.primary_key(&wtxn)?, primary_key) {
self.put_primary_key(&mut wtxn, primary_key)?;
if let (None, Some(ref primary_key)) = (self.primary_key(txn)?, primary_key) {
self.put_primary_key(txn, primary_key)?;
}
let mut builder = update_builder.index_documents(&mut wtxn, self);
let mut builder = update_builder.index_documents(txn, self);
builder.update_format(format);
builder.index_documents_method(method);
@ -152,19 +202,17 @@ impl Index {
|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step);
let gzipped = false;
let result = match content {
Some(content) if gzipped => builder.execute(GzDecoder::new(content), indexing_callback),
Some(content) => builder.execute(content, indexing_callback),
None => builder.execute(std::io::empty(), indexing_callback),
let addition = match content {
Some(content) if gzipped => {
builder.execute(GzDecoder::new(content), indexing_callback)?
}
Some(content) => builder.execute(content, indexing_callback)?,
None => builder.execute(std::io::empty(), indexing_callback)?,
};
info!("document addition done: {:?}", result);
info!("document addition done: {:?}", addition);
result.and_then(|addition_result| {
wtxn.commit()
.and(Ok(UpdateResult::DocumentsAddition(addition_result)))
.map_err(Into::into)
})
Ok(UpdateResult::DocumentsAddition(addition))
}
pub fn clear_documents(&self, update_builder: UpdateBuilder) -> anyhow::Result<UpdateResult> {
@ -181,14 +229,14 @@ impl Index {
}
}
pub fn update_settings(
&self,
pub fn update_settings_txn<'a, 'b>(
&'a self,
txn: &mut heed::RwTxn<'a, 'b>,
settings: &Settings<Checked>,
update_builder: UpdateBuilder,
) -> anyhow::Result<UpdateResult> {
// We must use the write transaction of the update here.
let mut wtxn = self.write_txn()?;
let mut builder = update_builder.settings(&mut wtxn, self);
let mut builder = update_builder.settings(txn, self);
if let Some(ref names) = settings.searchable_attributes {
match names {
@ -230,16 +278,22 @@ impl Index {
}
}
let result = builder
.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step));
builder.execute(|indexing_step, update_id| {
info!("update {}: {:?}", update_id, indexing_step)
})?;
match result {
Ok(()) => wtxn
.commit()
.and(Ok(UpdateResult::Other))
.map_err(Into::into),
Err(e) => Err(e),
}
Ok(UpdateResult::Other)
}
pub fn update_settings(
&self,
settings: &Settings<Checked>,
update_builder: UpdateBuilder,
) -> anyhow::Result<UpdateResult> {
let mut txn = self.write_txn()?;
let result = self.update_settings_txn(&mut txn, settings, update_builder)?;
txn.commit()?;
Ok(result)
}
pub fn delete_documents(
@ -288,7 +342,10 @@ mod test {
let checked = settings.clone().check();
assert_eq!(settings.displayed_attributes, checked.displayed_attributes);
assert_eq!(settings.searchable_attributes, checked.searchable_attributes);
assert_eq!(
settings.searchable_attributes,
checked.searchable_attributes
);
// test wildcard
// test no changes

View File

@ -0,0 +1,156 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_stream::stream;
use chrono::Utc;
use futures::{lock::Mutex, stream::StreamExt};
use log::{error, info};
use tokio::sync::{mpsc, oneshot, RwLock};
use update_actor::UpdateActorHandle;
use uuid_resolver::UuidResolverHandle;
use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus, DumpTask};
use crate::index_controller::{update_actor, uuid_resolver};
pub const CONCURRENT_DUMP_MSG: usize = 10;
pub struct DumpActor<UuidResolver, Update> {
inbox: Option<mpsc::Receiver<DumpMsg>>,
uuid_resolver: UuidResolver,
update: Update,
dump_path: PathBuf,
lock: Arc<Mutex<()>>,
dump_infos: Arc<RwLock<HashMap<String, DumpInfo>>>,
update_db_size: usize,
index_db_size: usize,
}
/// Generate uid from creation date
fn generate_uid() -> String {
Utc::now().format("%Y%m%d-%H%M%S%3f").to_string()
}
impl<UuidResolver, Update> DumpActor<UuidResolver, Update>
where
UuidResolver: UuidResolverHandle + Send + Sync + Clone + 'static,
Update: UpdateActorHandle + Send + Sync + Clone + 'static,
{
pub fn new(
inbox: mpsc::Receiver<DumpMsg>,
uuid_resolver: UuidResolver,
update: Update,
dump_path: impl AsRef<Path>,
index_db_size: usize,
update_db_size: usize,
) -> Self {
let dump_infos = Arc::new(RwLock::new(HashMap::new()));
let lock = Arc::new(Mutex::new(()));
Self {
inbox: Some(inbox),
uuid_resolver,
update,
dump_path: dump_path.as_ref().into(),
dump_infos,
lock,
index_db_size,
update_db_size,
}
}
pub async fn run(mut self) {
info!("Started dump actor.");
let mut inbox = self
.inbox
.take()
.expect("Dump Actor must have a inbox at this point.");
let stream = stream! {
loop {
match inbox.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
stream
.for_each_concurrent(Some(CONCURRENT_DUMP_MSG), |msg| self.handle_message(msg))
.await;
error!("Dump actor stopped.");
}
async fn handle_message(&self, msg: DumpMsg) {
use DumpMsg::*;
match msg {
CreateDump { ret } => {
let _ = self.handle_create_dump(ret).await;
}
DumpInfo { ret, uid } => {
let _ = ret.send(self.handle_dump_info(uid).await);
}
}
}
async fn handle_create_dump(&self, ret: oneshot::Sender<DumpResult<DumpInfo>>) {
let uid = generate_uid();
let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress);
let _lock = match self.lock.try_lock() {
Some(lock) => lock,
None => {
ret.send(Err(DumpError::DumpAlreadyRunning))
.expect("Dump actor is dead");
return;
}
};
self.dump_infos
.write()
.await
.insert(uid.clone(), info.clone());
ret.send(Ok(info)).expect("Dump actor is dead");
let task = DumpTask {
path: self.dump_path.clone(),
uuid_resolver: self.uuid_resolver.clone(),
update_handle: self.update.clone(),
uid: uid.clone(),
update_db_size: self.update_db_size,
index_db_size: self.index_db_size,
};
let task_result = tokio::task::spawn(task.run()).await;
let mut dump_infos = self.dump_infos.write().await;
let dump_infos = dump_infos
.get_mut(&uid)
.expect("dump entry deleted while lock was acquired");
match task_result {
Ok(Ok(())) => {
dump_infos.done();
info!("Dump succeed");
}
Ok(Err(e)) => {
dump_infos.with_error(e.to_string());
error!("Dump failed: {}", e);
}
Err(_) => {
dump_infos.with_error("Unexpected error while performing dump.".to_string());
error!("Dump panicked. Dump status set to failed");
}
};
}
async fn handle_dump_info(&self, uid: String) -> DumpResult<DumpInfo> {
match self.dump_infos.read().await.get(&uid) {
Some(info) => Ok(info.clone()),
_ => Err(DumpError::DumpDoesNotExist(uid)),
}
}
}

View File

@ -0,0 +1,52 @@
use std::path::Path;
use actix_web::web::Bytes;
use tokio::sync::{mpsc, oneshot};
use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg, DumpResult};
#[derive(Clone)]
pub struct DumpActorHandleImpl {
sender: mpsc::Sender<DumpMsg>,
}
#[async_trait::async_trait]
impl DumpActorHandle for DumpActorHandleImpl {
async fn create_dump(&self) -> DumpResult<DumpInfo> {
let (ret, receiver) = oneshot::channel();
let msg = DumpMsg::CreateDump { ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("IndexActor has been killed")
}
async fn dump_info(&self, uid: String) -> DumpResult<DumpInfo> {
let (ret, receiver) = oneshot::channel();
let msg = DumpMsg::DumpInfo { ret, uid };
let _ = self.sender.send(msg).await;
receiver.await.expect("IndexActor has been killed")
}
}
impl DumpActorHandleImpl {
pub fn new(
path: impl AsRef<Path>,
uuid_resolver: crate::index_controller::uuid_resolver::UuidResolverHandleImpl,
update: crate::index_controller::update_actor::UpdateActorHandleImpl<Bytes>,
index_db_size: usize,
update_db_size: usize,
) -> anyhow::Result<Self> {
let (sender, receiver) = mpsc::channel(10);
let actor = DumpActor::new(
receiver,
uuid_resolver,
update,
path,
index_db_size,
update_db_size,
);
tokio::task::spawn(actor.run());
Ok(Self { sender })
}
}

View File

@ -0,0 +1,2 @@
pub mod v1;
pub mod v2;

View File

@ -0,0 +1,183 @@
use std::collections::{BTreeMap, BTreeSet};
use std::fs::{create_dir_all, File};
use std::io::BufRead;
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use heed::EnvOpenOptions;
use log::{error, info, warn};
use milli::update::{IndexDocumentsMethod, UpdateFormat};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::index_controller::{self, uuid_resolver::HeedUuidStore, IndexMetadata};
use crate::{
index::{deserialize_some, update_handler::UpdateHandler, Index, Unchecked},
option::IndexerOpts,
};
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MetadataV1 {
db_version: String,
indexes: Vec<IndexMetadata>,
}
impl MetadataV1 {
pub fn load_dump(
self,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
size: usize,
indexer_options: &IndexerOpts,
) -> anyhow::Result<()> {
info!(
"Loading dump, dump database version: {}, dump version: V1",
self.db_version
);
let uuid_store = HeedUuidStore::new(&dst)?;
for index in self.indexes {
let uuid = Uuid::new_v4();
uuid_store.insert(index.uid.clone(), uuid)?;
let src = src.as_ref().join(index.uid);
load_index(
&src,
&dst,
uuid,
index.meta.primary_key.as_deref(),
size,
indexer_options,
)?;
}
Ok(())
}
}
// These are the settings used in legacy meilisearch (<v0.21.0).
#[derive(Default, Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
struct Settings {
#[serde(default, deserialize_with = "deserialize_some")]
pub ranking_rules: Option<Option<Vec<String>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub distinct_attribute: Option<Option<String>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub searchable_attributes: Option<Option<Vec<String>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub displayed_attributes: Option<Option<BTreeSet<String>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub stop_words: Option<Option<BTreeSet<String>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub synonyms: Option<Option<BTreeMap<String, Vec<String>>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub attributes_for_faceting: Option<Option<Vec<String>>>,
}
fn load_index(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
uuid: Uuid,
primary_key: Option<&str>,
size: usize,
indexer_options: &IndexerOpts,
) -> anyhow::Result<()> {
let index_path = dst.as_ref().join(&format!("indexes/index-{}", uuid));
create_dir_all(&index_path)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, index_path)?;
let index = Index(Arc::new(index));
// extract `settings.json` file and import content
let settings = import_settings(&src)?;
let settings: index_controller::Settings<Unchecked> = settings.into();
let mut txn = index.write_txn()?;
let handler = UpdateHandler::new(&indexer_options)?;
index.update_settings_txn(&mut txn, &settings.check(), handler.update_builder(0))?;
let file = File::open(&src.as_ref().join("documents.jsonl"))?;
let mut reader = std::io::BufReader::new(file);
reader.fill_buf()?;
if !reader.buffer().is_empty() {
index.update_documents_txn(
&mut txn,
UpdateFormat::JsonStream,
IndexDocumentsMethod::ReplaceDocuments,
Some(reader),
handler.update_builder(0),
primary_key,
)?;
}
txn.commit()?;
// Finaly, we extract the original milli::Index and close it
Arc::try_unwrap(index.0)
.map_err(|_e| "Couldn't close the index properly")
.unwrap()
.prepare_for_closing()
.wait();
// Updates are ignored in dumps V1.
Ok(())
}
/// we need to **always** be able to convert the old settings to the settings currently being used
impl From<Settings> for index_controller::Settings<Unchecked> {
fn from(settings: Settings) -> Self {
if settings.synonyms.flatten().is_some() {
error!("`synonyms` are not yet implemented and thus will be ignored");
}
Self {
distinct_attribute: settings.distinct_attribute,
// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
displayed_attributes: settings.displayed_attributes.map(|o| o.map(|vec| vec.into_iter().collect())),
searchable_attributes: settings.searchable_attributes,
// we previously had a `Vec<String>` but now we have a `HashMap<String, String>`
// representing the name of the faceted field + the type of the field. Since the type
// was not known in the V1 of the dump we are just going to assume everything is a
// String
attributes_for_faceting: settings.attributes_for_faceting.map(|o| o.map(|vec| vec.into_iter().map(|key| (key, String::from("string"))).collect())),
// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
ranking_rules: settings.ranking_rules.map(|o| o.map(|vec| vec.into_iter().filter_map(|criterion| {
match criterion.as_str() {
"words" | "typo" | "proximity" | "attribute" => Some(criterion),
s if s.starts_with("asc") || s.starts_with("desc") => Some(criterion),
"wordsPosition" => {
warn!("The criteria `words` and `wordsPosition` have been merged into a single criterion `words` so `wordsPositon` will be ignored");
Some(String::from("words"))
}
"exactness" => {
error!("The criterion `{}` is not implemented currently and thus will be ignored", criterion);
None
}
s => {
error!("Unknown criterion found in the dump: `{}`, it will be ignored", s);
None
}
}
}).collect())),
// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
stop_words: settings.stop_words.map(|o| o.map(|vec| vec.into_iter().collect())),
_kind: PhantomData,
}
}
}
/// Extract Settings from `settings.json` file present at provided `dir_path`
fn import_settings(dir_path: impl AsRef<Path>) -> anyhow::Result<Settings> {
let path = dir_path.as_ref().join("settings.json");
let file = File::open(path)?;
let reader = std::io::BufReader::new(file);
let metadata = serde_json::from_reader(reader)?;
Ok(metadata)
}

View File

@ -0,0 +1,59 @@
use std::path::Path;
use chrono::{DateTime, Utc};
use log::info;
use serde::{Deserialize, Serialize};
use crate::index::Index;
use crate::index_controller::{update_actor::UpdateStore, uuid_resolver::HeedUuidStore};
use crate::option::IndexerOpts;
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MetadataV2 {
db_version: String,
index_db_size: usize,
update_db_size: usize,
dump_date: DateTime<Utc>,
}
impl MetadataV2 {
pub fn new(index_db_size: usize, update_db_size: usize) -> Self {
Self {
db_version: env!("CARGO_PKG_VERSION").to_string(),
index_db_size,
update_db_size,
dump_date: Utc::now(),
}
}
pub fn load_dump(
self,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
index_db_size: usize,
update_db_size: usize,
indexing_options: &IndexerOpts,
) -> anyhow::Result<()> {
info!(
"Loading dump from {}, dump database version: {}, dump version: V2",
self.dump_date, self.db_version
);
info!("Loading index database.");
HeedUuidStore::load_dump(src.as_ref(), &dst)?;
info!("Loading updates.");
UpdateStore::load_dump(&src, &dst, update_db_size)?;
info!("Loading indexes.");
let indexes_path = src.as_ref().join("indexes");
let indexes = indexes_path.read_dir()?;
for index in indexes {
let index = index?;
Index::load_dump(&index.path(), &dst, index_db_size, indexing_options)?;
}
Ok(())
}
}

View File

@ -0,0 +1,13 @@
use tokio::sync::oneshot;
use super::{DumpInfo, DumpResult};
pub enum DumpMsg {
CreateDump {
ret: oneshot::Sender<DumpResult<DumpInfo>>,
},
DumpInfo {
uid: String,
ret: oneshot::Sender<DumpResult<DumpInfo>>,
},
}

View File

@ -0,0 +1,214 @@
use std::fs::File;
use std::path::{Path, PathBuf};
use anyhow::Context;
use chrono::{DateTime, Utc};
use log::{error, info, warn};
#[cfg(test)]
use mockall::automock;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::fs::create_dir_all;
use loaders::v1::MetadataV1;
use loaders::v2::MetadataV2;
pub use actor::DumpActor;
pub use handle_impl::*;
pub use message::DumpMsg;
use super::{update_actor::UpdateActorHandle, uuid_resolver::UuidResolverHandle};
use crate::{helpers::compression, option::IndexerOpts};
mod actor;
mod handle_impl;
mod loaders;
mod message;
const META_FILE_NAME: &str = "metadata.json";
pub type DumpResult<T> = std::result::Result<T, DumpError>;
#[derive(Error, Debug)]
pub enum DumpError {
#[error("error with index: {0}")]
Error(#[from] anyhow::Error),
#[error("Heed error: {0}")]
HeedError(#[from] heed::Error),
#[error("dump already running")]
DumpAlreadyRunning,
#[error("dump `{0}` does not exist")]
DumpDoesNotExist(String),
}
#[async_trait::async_trait]
#[cfg_attr(test, automock)]
pub trait DumpActorHandle {
/// Start the creation of a dump
/// Implementation: [handle_impl::DumpActorHandleImpl::create_dump]
async fn create_dump(&self) -> DumpResult<DumpInfo>;
/// Return the status of an already created dump
/// Implementation: [handle_impl::DumpActorHandleImpl::dump_status]
async fn dump_info(&self, uid: String) -> DumpResult<DumpInfo>;
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "dumpVersion")]
pub enum Metadata {
V1(MetadataV1),
V2(MetadataV2),
}
impl Metadata {
pub fn new_v2(index_db_size: usize, update_db_size: usize) -> Self {
let meta = MetadataV2::new(index_db_size, update_db_size);
Self::V2(meta)
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
#[serde(rename_all = "snake_case")]
pub enum DumpStatus {
Done,
InProgress,
Failed,
}
#[derive(Debug, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct DumpInfo {
pub uid: String,
pub status: DumpStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
started_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
finished_at: Option<DateTime<Utc>>,
}
impl DumpInfo {
pub fn new(uid: String, status: DumpStatus) -> Self {
Self {
uid,
status,
error: None,
started_at: Utc::now(),
finished_at: None,
}
}
pub fn with_error(&mut self, error: String) {
self.status = DumpStatus::Failed;
self.finished_at = Some(Utc::now());
self.error = Some(error);
}
pub fn done(&mut self) {
self.finished_at = Some(Utc::now());
self.status = DumpStatus::Done;
}
pub fn dump_already_in_progress(&self) -> bool {
self.status == DumpStatus::InProgress
}
}
pub fn load_dump(
dst_path: impl AsRef<Path>,
src_path: impl AsRef<Path>,
index_db_size: usize,
update_db_size: usize,
indexer_opts: &IndexerOpts,
) -> anyhow::Result<()> {
let tmp_src = tempfile::tempdir_in(".")?;
let tmp_src_path = tmp_src.path();
compression::from_tar_gz(&src_path, tmp_src_path)?;
let meta_path = tmp_src_path.join(META_FILE_NAME);
let mut meta_file = File::open(&meta_path)?;
let meta: Metadata = serde_json::from_reader(&mut meta_file)?;
let dst_dir = dst_path
.as_ref()
.parent()
.with_context(|| format!("Invalid db path: {}", dst_path.as_ref().display()))?;
let tmp_dst = tempfile::tempdir_in(dst_dir)?;
match meta {
Metadata::V1(meta) => {
meta.load_dump(&tmp_src_path, tmp_dst.path(), index_db_size, indexer_opts)?
}
Metadata::V2(meta) => meta.load_dump(
&tmp_src_path,
tmp_dst.path(),
index_db_size,
update_db_size,
indexer_opts,
)?,
}
// Persist and atomically rename the db
let persisted_dump = tmp_dst.into_path();
if dst_path.as_ref().exists() {
warn!("Overwriting database at {}", dst_path.as_ref().display());
std::fs::remove_dir_all(&dst_path)?;
}
std::fs::rename(&persisted_dump, &dst_path)?;
Ok(())
}
struct DumpTask<U, P> {
path: PathBuf,
uuid_resolver: U,
update_handle: P,
uid: String,
update_db_size: usize,
index_db_size: usize,
}
impl<U, P> DumpTask<U, P>
where
U: UuidResolverHandle + Send + Sync + Clone + 'static,
P: UpdateActorHandle + Send + Sync + Clone + 'static,
{
async fn run(self) -> anyhow::Result<()> {
info!("Performing dump.");
create_dir_all(&self.path).await?;
let path_clone = self.path.clone();
let temp_dump_dir =
tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(path_clone)).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
let meta = Metadata::new_v2(self.index_db_size, self.update_db_size);
let meta_path = temp_dump_path.join(META_FILE_NAME);
let mut meta_file = File::create(&meta_path)?;
serde_json::to_writer(&mut meta_file, &meta)?;
let uuids = self.uuid_resolver.dump(temp_dump_path.clone()).await?;
self.update_handle
.dump(uuids, temp_dump_path.clone())
.await?;
let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?;
compression::to_tar_gz(temp_dump_path, temp_dump_file.path())?;
let dump_path = self.path.join(self.uid).with_extension("dump");
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
})
.await??;
info!("Created dump in {:?}.", dump_path);
Ok(())
}
}

View File

@ -6,14 +6,15 @@ use async_stream::stream;
use futures::stream::StreamExt;
use heed::CompactionOption;
use log::debug;
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
use tokio::{fs, sync::mpsc};
use uuid::Uuid;
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::index::{
update_handler::UpdateHandler, Checked, Document, SearchQuery, SearchResult, Settings,
};
use crate::index_controller::{
get_arc_ownership_blocking, update_handler::UpdateHandler, Failed, IndexStats, Processed,
Processing,
get_arc_ownership_blocking, Failed, IndexStats, Processed, Processing,
};
use crate::option::IndexerOpts;
@ -30,12 +31,19 @@ pub struct IndexActor<S> {
impl<S: IndexStore + Sync + Send> IndexActor<S> {
pub fn new(receiver: mpsc::Receiver<IndexMsg>, store: S) -> IndexResult<Self> {
let options = IndexerOpts::default();
let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?;
let update_handler = UpdateHandler::new(&options)?;
let update_handler = Arc::new(update_handler);
let receiver = Some(receiver);
Ok(Self { receiver, update_handler, store })
Ok(Self {
receiver,
update_handler,
store,
})
}
/// `run` poll the write_receiver and read_receiver concurrently, but while messages send
/// through the read channel are processed concurrently, the messages sent through the write
/// channel are processed one at a time.
pub async fn run(mut self) {
let mut receiver = self
.receiver
@ -119,6 +127,9 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Snapshot { uuid, path, ret } => {
let _ = ret.send(self.handle_snapshot(uuid, path).await);
}
Dump { uuid, path, ret } => {
let _ = ret.send(self.handle_dump(uuid, path).await);
}
GetStats { uuid, ret } => {
let _ = ret.send(self.handle_get_stats(uuid).await);
}
@ -140,9 +151,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
primary_key: Option<String>,
) -> IndexResult<IndexMeta> {
let index = self.store.create(uuid, primary_key).await?;
let meta = spawn_blocking(move || IndexMeta::new(&index))
.await
.map_err(|e| IndexError::Error(e.into()))??;
let meta = spawn_blocking(move || IndexMeta::new(&index)).await??;
Ok(meta)
}
@ -159,9 +168,9 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
None => self.store.create(uuid, None).await?,
};
spawn_blocking(move || update_handler.handle_update(meta, data, index))
.await
.map_err(|e| IndexError::Error(e.into()))
let result =
spawn_blocking(move || update_handler.handle_update(meta, data, index)).await?;
Ok(result)
}
async fn handle_settings(&self, uuid: Uuid) -> IndexResult<Settings<Checked>> {
@ -170,9 +179,8 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || index.settings().map_err(IndexError::Error))
.await
.map_err(|e| IndexError::Error(e.into()))?
let result = spawn_blocking(move || index.settings()).await??;
Ok(result)
}
async fn handle_fetch_documents(
@ -187,13 +195,11 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || {
index
.retrieve_documents(offset, limit, attributes_to_retrieve)
.map_err(IndexError::Error)
})
.await
.map_err(|e| IndexError::Error(e.into()))?
let result =
spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve))
.await??;
Ok(result)
}
async fn handle_fetch_document(
@ -207,13 +213,12 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || {
index
.retrieve_document(doc_id, attributes_to_retrieve)
.map_err(IndexError::Error)
})
.await
.map_err(|e| IndexError::Error(e.into()))?
let result =
spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve))
.await??;
Ok(result)
}
async fn handle_delete(&self, uuid: Uuid) -> IndexResult<()> {
@ -236,9 +241,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
async fn handle_get_meta(&self, uuid: Uuid) -> IndexResult<IndexMeta> {
match self.store.get(uuid).await? {
Some(index) => {
let meta = spawn_blocking(move || IndexMeta::new(&index))
.await
.map_err(|e| IndexError::Error(e.into()))??;
let meta = spawn_blocking(move || IndexMeta::new(&index)).await??;
Ok(meta)
}
None => Err(IndexError::UnexistingIndex),
@ -256,7 +259,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || match index_settings.primary_key {
let result = spawn_blocking(move || match index_settings.primary_key {
Some(ref primary_key) => {
let mut txn = index.write_txn()?;
if index.primary_key(&txn)?.is_some() {
@ -272,23 +275,22 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(meta)
}
})
.await
.map_err(|e| IndexError::Error(e.into()))?
.await??;
Ok(result)
}
async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> IndexResult<()> {
use tokio::fs::create_dir_all;
path.push("indexes");
create_dir_all(&path)
.await
.map_err(|e| IndexError::Error(e.into()))?;
create_dir_all(&path).await?;
if let Some(index) = self.store.get(uuid).await? {
let mut index_path = path.join(format!("index-{}", uuid));
create_dir_all(&index_path)
.await
.map_err(|e| IndexError::Error(e.into()))?;
create_dir_all(&index_path).await?;
index_path.push("data.mdb");
spawn_blocking(move || -> anyhow::Result<()> {
// Get write txn to wait for ongoing write transaction before snapshot.
@ -298,14 +300,29 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.copy_to_path(index_path, CompactionOption::Enabled)?;
Ok(())
})
.await
.map_err(|e| IndexError::Error(e.into()))?
.map_err(IndexError::Error)?;
.await??;
}
Ok(())
}
/// Create a `documents.jsonl` and a `settings.json` in `path/uid/` with a dump of all the
/// documents and all the settings.
async fn handle_dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
let path = path.join(format!("indexes/index-{}/", uuid));
fs::create_dir_all(&path).await?;
tokio::task::spawn_blocking(move || index.dump(path)).await??;
Ok(())
}
async fn handle_get_stats(&self, uuid: Uuid) -> IndexResult<IndexStats> {
let index = self
.store
@ -323,7 +340,6 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
fields_distribution: index.fields_distribution(&rtxn)?,
})
})
.await
.map_err(|e| IndexError::Error(e.into()))?
.await?
}
}

View File

@ -3,7 +3,10 @@ use std::path::{Path, PathBuf};
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use crate::{index::Checked, index_controller::{IndexSettings, IndexStats, Processing}};
use crate::{
index::Checked,
index_controller::{IndexSettings, IndexStats, Processing},
};
use crate::{
index::{Document, SearchQuery, SearchResult, Settings},
index_controller::{Failed, Processed},
@ -136,6 +139,13 @@ impl IndexActorHandle for IndexActorHandleImpl {
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Dump { uuid, path, ret };
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn get_index_stats(&self, uuid: Uuid) -> IndexResult<IndexStats> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::GetStats { uuid, ret };

View File

@ -3,7 +3,7 @@ use std::path::PathBuf;
use tokio::sync::oneshot;
use uuid::Uuid;
use crate::index::{Document, SearchQuery, SearchResult, Settings, Checked};
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::index_controller::{Failed, IndexStats, Processed, Processing};
use super::{IndexMeta, IndexResult, IndexSettings};
@ -60,6 +60,11 @@ pub enum IndexMsg {
path: PathBuf,
ret: oneshot::Sender<IndexResult<()>>,
},
Dump {
uuid: Uuid,
path: PathBuf,
ret: oneshot::Sender<IndexResult<()>>,
},
GetStats {
uuid: Uuid,
ret: oneshot::Sender<IndexResult<IndexStats>>,

View File

@ -15,7 +15,7 @@ use message::IndexMsg;
use store::{IndexStore, MapIndexStore};
use crate::index::{Checked, Document, Index, SearchQuery, SearchResult, Settings};
use crate::index_controller::{Failed, Processed, Processing, IndexStats};
use crate::index_controller::{Failed, IndexStats, Processed, Processing};
use super::IndexSettings;
@ -31,7 +31,7 @@ pub type IndexResult<T> = std::result::Result<T, IndexError>;
pub struct IndexMeta {
created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
primary_key: Option<String>,
pub primary_key: Option<String>,
}
impl IndexMeta {
@ -44,24 +44,45 @@ impl IndexMeta {
let created_at = index.created_at(&txn)?;
let updated_at = index.updated_at(&txn)?;
let primary_key = index.primary_key(&txn)?.map(String::from);
Ok(Self { created_at, updated_at, primary_key })
Ok(Self {
created_at,
updated_at,
primary_key,
})
}
}
#[derive(Error, Debug)]
pub enum IndexError {
#[error("error with index: {0}")]
Error(#[from] anyhow::Error),
#[error("index already exists")]
IndexAlreadyExists,
#[error("Index doesn't exists")]
UnexistingIndex,
#[error("Heed error: {0}")]
HeedError(#[from] heed::Error),
#[error("Existing primary key")]
ExistingPrimaryKey,
#[error("Internal Index Error: {0}")]
Internal(String),
}
macro_rules! internal_error {
($($other:path), *) => {
$(
impl From<$other> for IndexError {
fn from(other: $other) -> Self {
Self::Internal(other.to_string())
}
}
)*
}
}
internal_error!(
anyhow::Error,
heed::Error,
tokio::task::JoinError,
std::io::Error
);
#[async_trait::async_trait]
#[cfg_attr(test, automock)]
pub trait IndexActorHandle {
@ -97,6 +118,7 @@ pub trait IndexActorHandle {
index_settings: IndexSettings,
) -> IndexResult<IndexMeta>;
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>;
async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()>;
async fn get_index_stats(&self, uuid: Uuid) -> IndexResult<IndexStats>;
}
@ -177,6 +199,10 @@ mod test {
self.as_ref().snapshot(uuid, path).await
}
async fn dump(&self, uuid: Uuid, path: PathBuf) -> IndexResult<()> {
self.as_ref().dump(uuid, path).await
}
async fn get_index_stats(&self, uuid: Uuid) -> IndexResult<IndexStats> {
self.as_ref().get_index_stats(uuid).await
}

View File

@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use heed::EnvOpenOptions;
use tokio::fs;
use tokio::sync::RwLock;
use tokio::task::spawn_blocking;
@ -48,7 +47,7 @@ impl IndexStore for MapIndexStore {
let index_size = self.index_size;
let index = spawn_blocking(move || -> IndexResult<Index> {
let index = open_index(&path, index_size)?;
let index = Index::open(path, index_size)?;
if let Some(primary_key) = primary_key {
let mut txn = index.write_txn()?;
index.put_primary_key(&mut txn, &primary_key)?;
@ -56,8 +55,7 @@ impl IndexStore for MapIndexStore {
}
Ok(index)
})
.await
.map_err(|e| IndexError::Error(e.into()))??;
.await??;
self.index_store.write().await.insert(uuid, index.clone());
@ -77,9 +75,7 @@ impl IndexStore for MapIndexStore {
}
let index_size = self.index_size;
let index = spawn_blocking(move || open_index(path, index_size))
.await
.map_err(|e| IndexError::Error(e.into()))??;
let index = spawn_blocking(move || Index::open(path, index_size)).await??;
self.index_store.write().await.insert(uuid, index.clone());
Ok(Some(index))
}
@ -88,18 +84,8 @@ impl IndexStore for MapIndexStore {
async fn delete(&self, uuid: Uuid) -> IndexResult<Option<Index>> {
let db_path = self.path.join(format!("index-{}", uuid));
fs::remove_dir_all(db_path)
.await
.map_err(|e| IndexError::Error(e.into()))?;
fs::remove_dir_all(db_path).await?;
let index = self.index_store.write().await.remove(&uuid);
Ok(index)
}
}
fn open_index(path: impl AsRef<Path>, size: usize) -> IndexResult<Index> {
std::fs::create_dir_all(&path).map_err(|e| IndexError::Error(e.into()))?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, &path).map_err(IndexError::Error)?;
Ok(Index(Arc::new(index)))
}

View File

@ -14,19 +14,23 @@ use tokio::sync::mpsc;
use tokio::time::sleep;
use uuid::Uuid;
pub use updates::*;
use dump_actor::DumpActorHandle;
pub use dump_actor::{DumpInfo, DumpStatus};
use index_actor::IndexActorHandle;
use snapshot::{SnapshotService, load_snapshot};
use snapshot::{load_snapshot, SnapshotService};
use update_actor::UpdateActorHandle;
use uuid_resolver::{UuidError, UuidResolverHandle};
pub use updates::*;
use uuid_resolver::{UuidResolverError, UuidResolverHandle};
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::option::Opt;
use self::dump_actor::load_dump;
mod dump_actor;
mod index_actor;
mod snapshot;
mod update_actor;
mod update_handler;
mod updates;
mod uuid_resolver;
@ -60,10 +64,12 @@ pub struct IndexStats {
pub fields_distribution: FieldsDistribution,
}
#[derive(Clone)]
pub struct IndexController {
uuid_resolver: uuid_resolver::UuidResolverHandleImpl,
index_handle: index_actor::IndexActorHandleImpl,
update_handle: update_actor::UpdateActorHandleImpl<Bytes>,
dump_handle: dump_actor::DumpActorHandleImpl,
}
#[derive(Serialize)]
@ -87,6 +93,14 @@ impl IndexController {
options.ignore_snapshot_if_db_exists,
options.ignore_missing_snapshot,
)?;
} else if let Some(ref src_path) = options.import_dump {
load_dump(
&options.db_path,
src_path,
options.max_mdb_size.get_bytes() as usize,
options.max_udb_size.get_bytes() as usize,
&options.indexer_options,
)?;
}
std::fs::create_dir_all(&path)?;
@ -98,6 +112,13 @@ impl IndexController {
&path,
update_store_size,
)?;
let dump_handle = dump_actor::DumpActorHandleImpl::new(
&options.dumps_dir,
uuid_resolver.clone(),
update_handle.clone(),
options.max_mdb_size.get_bytes() as usize,
options.max_udb_size.get_bytes() as usize,
)?;
if options.schedule_snapshot {
let snapshot_service = SnapshotService::new(
@ -119,6 +140,7 @@ impl IndexController {
uuid_resolver,
index_handle,
update_handle,
dump_handle,
})
}
@ -143,11 +165,6 @@ impl IndexController {
// registered and the update_actor that waits for the the payload to be sent to it.
tokio::task::spawn_local(async move {
payload
.map(|bytes| {
bytes.map_err(|e| {
Box::new(e) as Box<dyn std::error::Error + Sync + Send + 'static>
})
})
.for_each(|r| async {
let _ = sender.send(r).await;
})
@ -160,7 +177,7 @@ impl IndexController {
match self.uuid_resolver.get(uid).await {
Ok(uuid) => Ok(perform_update(uuid).await?),
Err(UuidError::UnexistingIndex(name)) => {
Err(UuidResolverError::UnexistingIndex(name)) => {
let uuid = Uuid::new_v4();
let status = perform_update(uuid).await?;
// ignore if index creation fails now, since it may already have been created
@ -206,7 +223,7 @@ impl IndexController {
create: bool,
) -> anyhow::Result<UpdateStatus> {
let perform_udpate = |uuid| async move {
let meta = UpdateMeta::Settings(settings);
let meta = UpdateMeta::Settings(settings.into_unchecked());
// Nothing so send, drop the sender right away, as not to block the update actor.
let (_, receiver) = mpsc::channel(1);
self.update_handle.update(meta, receiver, uuid).await
@ -214,7 +231,7 @@ impl IndexController {
match self.uuid_resolver.get(uid).await {
Ok(uuid) => Ok(perform_udpate(uuid).await?),
Err(UuidError::UnexistingIndex(name)) if create => {
Err(UuidResolverError::UnexistingIndex(name)) if create => {
let uuid = Uuid::new_v4();
let status = perform_udpate(uuid).await?;
// ignore if index creation fails now, since it may already have been created
@ -393,6 +410,14 @@ impl IndexController {
indexes,
})
}
pub async fn create_dump(&self) -> anyhow::Result<DumpInfo> {
Ok(self.dump_handle.create_dump().await?)
}
pub async fn dump_info(&self, uid: String) -> anyhow::Result<DumpInfo> {
Ok(self.dump_handle.dump_info(uid).await?)
}
}
pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {

View File

@ -144,7 +144,7 @@ mod test {
use crate::index_controller::update_actor::{
MockUpdateActorHandle, UpdateActorHandleImpl, UpdateError,
};
use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidError};
use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidResolverError};
#[actix_rt::test]
async fn test_normal() {
@ -193,7 +193,7 @@ mod test {
.expect_snapshot()
.times(1)
// abitrary error
.returning(|_| Box::pin(err(UuidError::NameAlreadyExist)));
.returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist)));
let update_handle = MockUpdateActorHandle::new();
@ -248,7 +248,7 @@ mod test {
// we expect the funtion to be called between 2 and 3 time in the given interval.
.times(2..4)
// abitrary error, to short-circuit the function
.returning(move |_| Box::pin(err(UuidError::NameAlreadyExist)));
.returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist)));
let update_handle = MockUpdateActorHandle::new();

View File

@ -11,7 +11,7 @@ use tokio::sync::mpsc;
use uuid::Uuid;
use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo};
use crate::index_controller::index_actor::{IndexActorHandle};
use crate::index_controller::index_actor::IndexActorHandle;
use crate::index_controller::{UpdateMeta, UpdateStatus};
pub struct UpdateActor<D, I> {
@ -42,7 +42,12 @@ where
let store = UpdateStore::open(options, &path, index_handle.clone())?;
std::fs::create_dir_all(path.join("update_files"))?;
assert!(path.exists());
Ok(Self { path, store, inbox, index_handle })
Ok(Self {
path,
store,
inbox,
index_handle,
})
}
pub async fn run(mut self) {
@ -72,6 +77,9 @@ where
Some(Snapshot { uuids, path, ret }) => {
let _ = ret.send(self.handle_snapshot(uuids, path).await);
}
Some(Dump { uuids, path, ret }) => {
let _ = ret.send(self.handle_dump(uuids, path).await);
}
Some(GetInfo { ret }) => {
let _ = ret.send(self.handle_get_info().await);
}
@ -86,11 +94,8 @@ where
meta: UpdateMeta,
mut payload: mpsc::Receiver<PayloadData<D>>,
) -> Result<UpdateStatus> {
let file_path = match meta {
UpdateMeta::DocumentsAddition { .. }
| UpdateMeta::DeleteDocuments => {
UpdateMeta::DocumentsAddition { .. } | UpdateMeta::DeleteDocuments => {
let update_file_id = uuid::Uuid::new_v4();
let path = self
.path
@ -100,39 +105,26 @@ where
.write(true)
.create(true)
.open(&path)
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
.await?;
let mut file_len = 0;
while let Some(bytes) = payload.recv().await {
match bytes {
Ok(bytes) => {
file_len += bytes.as_ref().len();
file.write_all(bytes.as_ref())
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
}
Err(e) => {
return Err(UpdateError::Error(e));
}
}
let bytes = bytes?;
file_len += bytes.as_ref().len();
file.write_all(bytes.as_ref()).await?;
}
if file_len != 0 {
file.flush()
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
file.flush().await?;
let file = file.into_std().await;
Some((file, path))
Some((file, update_file_id))
} else {
// empty update, delete the empty file.
fs::remove_file(&path)
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
fs::remove_file(&path).await?;
None
}
}
_ => None
_ => None,
};
let update_store = self.store.clone();
@ -141,52 +133,45 @@ where
use std::io::{copy, sink, BufReader, Seek};
// If the payload is empty, ignore the check.
let path = if let Some((mut file, path)) = file_path {
let update_uuid = if let Some((mut file, uuid)) = file_path {
// set the file back to the beginning
file.seek(SeekFrom::Start(0)).map_err(|e| UpdateError::Error(Box::new(e)))?;
file.seek(SeekFrom::Start(0))?;
// Check that the json payload is valid:
let reader = BufReader::new(&mut file);
let mut checker = JsonChecker::new(reader);
if copy(&mut checker, &mut sink()).is_err() || checker.finish().is_err() {
// The json file is invalid, we use Serde to get a nice error message:
file.seek(SeekFrom::Start(0))
.map_err(|e| UpdateError::Error(Box::new(e)))?;
let _: serde_json::Value = serde_json::from_reader(file)
.map_err(|e| UpdateError::Error(Box::new(e)))?;
file.seek(SeekFrom::Start(0))?;
let _: serde_json::Value = serde_json::from_reader(file)?;
}
Some(path)
Some(uuid)
} else {
None
};
// The payload is valid, we can register it to the update store.
update_store
.register_update(meta, path, uuid)
.map(UpdateStatus::Enqueued)
.map_err(|e| UpdateError::Error(Box::new(e)))
let status = update_store
.register_update(meta, update_uuid, uuid)
.map(UpdateStatus::Enqueued)?;
Ok(status)
})
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?
.await?
}
async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let update_store = self.store.clone();
tokio::task::spawn_blocking(move || {
let result = update_store
.list(uuid)
.map_err(|e| UpdateError::Error(e.into()))?;
let result = update_store.list(uuid)?;
Ok(result)
})
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?
.await?
}
async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
let store = self.store.clone();
let result = store
.meta(uuid, id)
.map_err(|e| UpdateError::Error(Box::new(e)))?
.meta(uuid, id)?
.ok_or(UpdateError::UnexistingUpdate(id))?;
Ok(result)
}
@ -194,10 +179,7 @@ where
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
let store = self.store.clone();
tokio::task::spawn_blocking(move || store.delete_all(uuid))
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
tokio::task::spawn_blocking(move || store.delete_all(uuid)).await??;
Ok(())
}
@ -206,10 +188,21 @@ where
let index_handle = self.index_handle.clone();
let update_store = self.store.clone();
tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle))
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle))
.await??;
Ok(())
}
async fn handle_dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let index_handle = self.index_handle.clone();
let update_store = self.store.clone();
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
update_store.dump(&uuids, path.to_path_buf(), index_handle)?;
Ok(())
})
.await??;
Ok(())
}
@ -220,9 +213,7 @@ where
let info = update_store.get_info()?;
Ok(info)
})
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
.await??;
Ok(info)
}

View File

@ -71,6 +71,13 @@ where
receiver.await.expect("update actor killed.")
}
async fn dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Dump { uuids, path, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
async fn get_info(&self) -> Result<UpdateStoreInfo> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::GetInfo { ret };

View File

@ -31,6 +31,11 @@ pub enum UpdateMsg<D> {
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
},
Dump {
uuids: HashSet<Uuid>,
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
},
GetInfo {
ret: oneshot::Sender<Result<UpdateStoreInfo>>,
},

View File

@ -1,10 +1,11 @@
mod actor;
mod handle_impl;
mod message;
mod update_store;
pub mod store;
use std::{collections::HashSet, path::PathBuf};
use actix_http::error::PayloadError;
use thiserror::Error;
use tokio::sync::mpsc;
use uuid::Uuid;
@ -13,25 +14,45 @@ use crate::index_controller::{UpdateMeta, UpdateStatus};
use actor::UpdateActor;
use message::UpdateMsg;
use update_store::UpdateStore;
pub use update_store::UpdateStoreInfo;
pub use handle_impl::UpdateActorHandleImpl;
pub use store::{UpdateStore, UpdateStoreInfo};
pub type Result<T> = std::result::Result<T, UpdateError>;
type PayloadData<D> = std::result::Result<D, Box<dyn std::error::Error + Sync + Send + 'static>>;
type PayloadData<D> = std::result::Result<D, PayloadError>;
#[cfg(test)]
use mockall::automock;
#[derive(Debug, Error)]
pub enum UpdateError {
#[error("error with update: {0}")]
Error(Box<dyn std::error::Error + Sync + Send + 'static>),
#[error("Update {0} doesn't exist.")]
UnexistingUpdate(u64),
#[error("Internal error processing update: {0}")]
Internal(String),
}
macro_rules! internal_error {
($($other:path), *) => {
$(
impl From<$other> for UpdateError {
fn from(other: $other) -> Self {
Self::Internal(other.to_string())
}
}
)*
}
}
internal_error!(
heed::Error,
std::io::Error,
serde_json::Error,
PayloadError,
tokio::task::JoinError,
anyhow::Error
);
#[async_trait::async_trait]
#[cfg_attr(test, automock(type Data=Vec<u8>;))]
pub trait UpdateActorHandle {
@ -40,7 +61,8 @@ pub trait UpdateActorHandle {
async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>>;
async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus>;
async fn delete(&self, uuid: Uuid) -> Result<()>;
async fn snapshot(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()>;
async fn snapshot(&self, uuid: HashSet<Uuid>, path: PathBuf) -> Result<()>;
async fn dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()>;
async fn get_info(&self) -> Result<UpdateStoreInfo>;
async fn update(
&self,

View File

@ -0,0 +1,86 @@
use std::{borrow::Cow, convert::TryInto, mem::size_of};
use heed::{BytesDecode, BytesEncode};
use uuid::Uuid;
pub struct NextIdCodec;
pub enum NextIdKey {
Global,
Index(Uuid),
}
impl<'a> BytesEncode<'a> for NextIdCodec {
type EItem = NextIdKey;
fn bytes_encode(item: &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
match item {
NextIdKey::Global => Some(Cow::Borrowed(b"__global__")),
NextIdKey::Index(ref uuid) => Some(Cow::Borrowed(uuid.as_bytes())),
}
}
}
pub struct PendingKeyCodec;
impl<'a> BytesEncode<'a> for PendingKeyCodec {
type EItem = (u64, Uuid, u64);
fn bytes_encode((global_id, uuid, update_id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
let mut bytes = Vec::with_capacity(size_of::<Self::EItem>());
bytes.extend_from_slice(&global_id.to_be_bytes());
bytes.extend_from_slice(uuid.as_bytes());
bytes.extend_from_slice(&update_id.to_be_bytes());
Some(Cow::Owned(bytes))
}
}
impl<'a> BytesDecode<'a> for PendingKeyCodec {
type DItem = (u64, Uuid, u64);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let global_id_bytes = bytes.get(0..size_of::<u64>())?.try_into().ok()?;
let global_id = u64::from_be_bytes(global_id_bytes);
let uuid_bytes = bytes
.get(size_of::<u64>()..(size_of::<u64>() + size_of::<Uuid>()))?
.try_into()
.ok()?;
let uuid = Uuid::from_bytes(uuid_bytes);
let update_id_bytes = bytes
.get((size_of::<u64>() + size_of::<Uuid>())..)?
.try_into()
.ok()?;
let update_id = u64::from_be_bytes(update_id_bytes);
Some((global_id, uuid, update_id))
}
}
pub struct UpdateKeyCodec;
impl<'a> BytesEncode<'a> for UpdateKeyCodec {
type EItem = (Uuid, u64);
fn bytes_encode((uuid, update_id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
let mut bytes = Vec::with_capacity(size_of::<Self::EItem>());
bytes.extend_from_slice(uuid.as_bytes());
bytes.extend_from_slice(&update_id.to_be_bytes());
Some(Cow::Owned(bytes))
}
}
impl<'a> BytesDecode<'a> for UpdateKeyCodec {
type DItem = (Uuid, u64);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let uuid_bytes = bytes.get(0..size_of::<Uuid>())?.try_into().ok()?;
let uuid = Uuid::from_bytes(uuid_bytes);
let update_id_bytes = bytes.get(size_of::<Uuid>()..)?.try_into().ok()?;
let update_id = u64::from_be_bytes(update_id_bytes);
Some((uuid, update_id))
}
}

View File

@ -0,0 +1,189 @@
use std::{
collections::HashSet,
fs::{create_dir_all, File},
io::{BufRead, BufReader, Write},
path::{Path, PathBuf},
};
use heed::{EnvOpenOptions, RoTxn};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::UpdateStore;
use super::{codec::UpdateKeyCodec, State};
use crate::index_controller::{
index_actor::IndexActorHandle, update_actor::store::update_uuid_to_file_path, Enqueued,
UpdateStatus,
};
#[derive(Serialize, Deserialize)]
struct UpdateEntry {
uuid: Uuid,
update: UpdateStatus,
}
impl UpdateStore {
pub fn dump(
&self,
uuids: &HashSet<Uuid>,
path: PathBuf,
handle: impl IndexActorHandle,
) -> anyhow::Result<()> {
let state_lock = self.state.write();
state_lock.swap(State::Dumping);
// txn must *always* be acquired after state lock, or it will dead lock.
let txn = self.env.write_txn()?;
let dump_path = path.join("updates");
create_dir_all(&dump_path)?;
self.dump_updates(&txn, uuids, &dump_path)?;
let fut = dump_indexes(uuids, handle, &path);
tokio::runtime::Handle::current().block_on(fut)?;
state_lock.swap(State::Idle);
Ok(())
}
fn dump_updates(
&self,
txn: &RoTxn,
uuids: &HashSet<Uuid>,
path: impl AsRef<Path>,
) -> anyhow::Result<()> {
let dump_data_path = path.as_ref().join("data.jsonl");
let mut dump_data_file = File::create(dump_data_path)?;
let update_files_path = path.as_ref().join(super::UPDATE_DIR);
create_dir_all(&update_files_path)?;
self.dump_pending(&txn, uuids, &mut dump_data_file, &path)?;
self.dump_completed(&txn, uuids, &mut dump_data_file)?;
Ok(())
}
fn dump_pending(
&self,
txn: &RoTxn,
uuids: &HashSet<Uuid>,
mut file: &mut File,
dst_path: impl AsRef<Path>,
) -> anyhow::Result<()> {
let pendings = self.pending_queue.iter(txn)?.lazily_decode_data();
for pending in pendings {
let ((_, uuid, _), data) = pending?;
if uuids.contains(&uuid) {
let update = data.decode()?;
if let Some(ref update_uuid) = update.content {
let src = super::update_uuid_to_file_path(&self.path, *update_uuid);
let dst = super::update_uuid_to_file_path(&dst_path, *update_uuid);
std::fs::copy(src, dst)?;
}
let update_json = UpdateEntry {
uuid,
update: update.into(),
};
serde_json::to_writer(&mut file, &update_json)?;
file.write_all(b"\n")?;
}
}
Ok(())
}
fn dump_completed(
&self,
txn: &RoTxn,
uuids: &HashSet<Uuid>,
mut file: &mut File,
) -> anyhow::Result<()> {
let updates = self
.updates
.iter(txn)?
.remap_key_type::<UpdateKeyCodec>()
.lazily_decode_data();
for update in updates {
let ((uuid, _), data) = update?;
if uuids.contains(&uuid) {
let update = data.decode()?;
let update_json = UpdateEntry { uuid, update };
serde_json::to_writer(&mut file, &update_json)?;
file.write_all(b"\n")?;
}
}
Ok(())
}
pub fn load_dump(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
db_size: usize,
) -> anyhow::Result<()> {
let dst_update_path = dst.as_ref().join("updates/");
create_dir_all(&dst_update_path)?;
let mut options = EnvOpenOptions::new();
options.map_size(db_size as usize);
let (store, _) = UpdateStore::new(options, &dst_update_path)?;
let src_update_path = src.as_ref().join("updates");
let update_data = File::open(&src_update_path.join("data.jsonl"))?;
let mut update_data = BufReader::new(update_data);
std::fs::create_dir_all(dst_update_path.join("update_files/"))?;
let mut wtxn = store.env.write_txn()?;
let mut line = String::new();
loop {
match update_data.read_line(&mut line) {
Ok(0) => break,
Ok(_) => {
let UpdateEntry { uuid, update } = serde_json::from_str(&line)?;
store.register_raw_updates(&mut wtxn, &update, uuid)?;
// Copy ascociated update path if it exists
if let UpdateStatus::Enqueued(Enqueued {
content: Some(uuid),
..
}) = update
{
let src = update_uuid_to_file_path(&src_update_path, uuid);
let dst = update_uuid_to_file_path(&dst_update_path, uuid);
std::fs::copy(src, dst)?;
}
}
_ => break,
}
line.clear();
}
wtxn.commit()?;
Ok(())
}
}
async fn dump_indexes(
uuids: &HashSet<Uuid>,
handle: impl IndexActorHandle,
path: impl AsRef<Path>,
) -> anyhow::Result<()> {
for uuid in uuids {
handle.dump(*uuid, path.as_ref().to_owned()).await?;
}
Ok(())
}

View File

@ -1,38 +1,35 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, HashSet};
use std::convert::TryInto;
mod codec;
pub mod dump;
use std::fs::{copy, create_dir_all, remove_file, File};
use std::mem::size_of;
use std::path::Path;
use std::sync::Arc;
use std::{
collections::{BTreeMap, HashSet},
path::PathBuf,
};
use anyhow::Context;
use arc_swap::ArcSwap;
use futures::StreamExt;
use heed::types::{ByteSlice, OwnedType, SerdeJson};
use heed::zerocopy::U64;
use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions};
use heed::{CompactionOption, Database, Env, EnvOpenOptions};
use log::error;
use parking_lot::{Mutex, MutexGuard};
use tokio::runtime::Handle;
use tokio::sync::mpsc;
use uuid::Uuid;
use codec::*;
use super::UpdateMeta;
use crate::index_controller::{updates::*, IndexActorHandle};
use crate::{
helpers::EnvSizer,
index_controller::index_actor::{IndexResult, CONCURRENT_INDEX_MSG},
};
use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle};
use crate::{helpers::EnvSizer, index_controller::index_actor::IndexResult};
#[allow(clippy::upper_case_acronyms)]
type BEU64 = U64<heed::byteorder::BE>;
struct NextIdCodec;
enum NextIdKey {
Global,
Index(Uuid),
}
const UPDATE_DIR: &str = "update_files";
pub struct UpdateStoreInfo {
/// Size of the update store in bytes.
@ -47,13 +44,13 @@ pub struct StateLock {
data: ArcSwap<State>,
}
struct StateLockGuard<'a> {
pub struct StateLockGuard<'a> {
_lock: MutexGuard<'a, ()>,
state: &'a StateLock,
}
impl StateLockGuard<'_> {
fn swap(&self, state: State) -> Arc<State> {
pub fn swap(&self, state: State) -> Arc<State> {
self.state.data.swap(Arc::new(state))
}
}
@ -65,11 +62,11 @@ impl StateLock {
Self { lock, data }
}
fn read(&self) -> Arc<State> {
pub fn read(&self) -> Arc<State> {
self.data.load().clone()
}
fn write(&self) -> StateLockGuard {
pub fn write(&self) -> StateLockGuard {
let _lock = self.lock.lock();
let state = &self;
StateLockGuard { _lock, state }
@ -81,81 +78,7 @@ pub enum State {
Idle,
Processing(Uuid, Processing),
Snapshoting,
}
impl<'a> BytesEncode<'a> for NextIdCodec {
type EItem = NextIdKey;
fn bytes_encode(item: &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
match item {
NextIdKey::Global => Some(Cow::Borrowed(b"__global__")),
NextIdKey::Index(ref uuid) => Some(Cow::Borrowed(uuid.as_bytes())),
}
}
}
struct PendingKeyCodec;
impl<'a> BytesEncode<'a> for PendingKeyCodec {
type EItem = (u64, Uuid, u64);
fn bytes_encode((global_id, uuid, update_id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
let mut bytes = Vec::with_capacity(size_of::<Self::EItem>());
bytes.extend_from_slice(&global_id.to_be_bytes());
bytes.extend_from_slice(uuid.as_bytes());
bytes.extend_from_slice(&update_id.to_be_bytes());
Some(Cow::Owned(bytes))
}
}
impl<'a> BytesDecode<'a> for PendingKeyCodec {
type DItem = (u64, Uuid, u64);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let global_id_bytes = bytes.get(0..size_of::<u64>())?.try_into().ok()?;
let global_id = u64::from_be_bytes(global_id_bytes);
let uuid_bytes = bytes
.get(size_of::<u64>()..(size_of::<u64>() + size_of::<Uuid>()))?
.try_into()
.ok()?;
let uuid = Uuid::from_bytes(uuid_bytes);
let update_id_bytes = bytes
.get((size_of::<u64>() + size_of::<Uuid>())..)?
.try_into()
.ok()?;
let update_id = u64::from_be_bytes(update_id_bytes);
Some((global_id, uuid, update_id))
}
}
struct UpdateKeyCodec;
impl<'a> BytesEncode<'a> for UpdateKeyCodec {
type EItem = (Uuid, u64);
fn bytes_encode((uuid, update_id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
let mut bytes = Vec::with_capacity(size_of::<Self::EItem>());
bytes.extend_from_slice(uuid.as_bytes());
bytes.extend_from_slice(&update_id.to_be_bytes());
Some(Cow::Owned(bytes))
}
}
impl<'a> BytesDecode<'a> for UpdateKeyCodec {
type DItem = (Uuid, u64);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let uuid_bytes = bytes.get(0..size_of::<Uuid>())?.try_into().ok()?;
let uuid = Uuid::from_bytes(uuid_bytes);
let update_id_bytes = bytes.get(size_of::<Uuid>()..)?.try_into().ok()?;
let update_id = u64::from_be_bytes(update_id_bytes);
Some((uuid, update_id))
}
Dumping,
}
#[derive(Clone)]
@ -175,46 +98,61 @@ pub struct UpdateStore {
/// | 16-bytes | 8-bytes |
updates: Database<ByteSlice, SerdeJson<UpdateStatus>>,
/// Indicates the current state of the update store,
state: Arc<StateLock>,
pub state: Arc<StateLock>,
/// Wake up the loop when a new event occurs.
notification_sender: mpsc::Sender<()>,
path: PathBuf,
}
impl UpdateStore {
pub fn open(
fn new(
mut options: EnvOpenOptions,
path: impl AsRef<Path>,
index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static,
) -> anyhow::Result<Arc<Self>> {
) -> anyhow::Result<(Self, mpsc::Receiver<()>)> {
options.max_dbs(5);
let env = options.open(path)?;
let env = options.open(&path)?;
let pending_queue = env.create_database(Some("pending-queue"))?;
let next_update_id = env.create_database(Some("next-update-id"))?;
let updates = env.create_database(Some("updates"))?;
let (notification_sender, mut notification_receiver) = mpsc::channel(10);
// Send a first notification to trigger the process.
let _ = notification_sender.send(());
let state = Arc::new(StateLock::from_state(State::Idle));
let (notification_sender, notification_receiver) = mpsc::channel(10);
Ok((
Self {
env,
pending_queue,
next_update_id,
updates,
state,
notification_sender,
path: path.as_ref().to_owned(),
},
notification_receiver,
))
}
pub fn open(
options: EnvOpenOptions,
path: impl AsRef<Path>,
index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static,
) -> anyhow::Result<Arc<Self>> {
let (update_store, mut notification_receiver) = Self::new(options, path)?;
let update_store = Arc::new(update_store);
// Send a first notification to trigger the process.
let _ = update_store.notification_sender.send(());
// Init update loop to perform any pending updates at launch.
// Since we just launched the update store, and we still own the receiving end of the
// channel, this call is guaranteed to succeed.
notification_sender
update_store
.notification_sender
.try_send(())
.expect("Failed to init update store");
let update_store = Arc::new(UpdateStore {
env,
pending_queue,
next_update_id,
updates,
state,
notification_sender,
});
// We need a weak reference so we can take ownership on the arc later when we
// want to close the index.
let update_store_weak = Arc::downgrade(&update_store);
@ -233,7 +171,7 @@ impl UpdateStore {
match res {
Ok(Some(_)) => (),
Ok(None) => break,
Err(e) => eprintln!("error while processing update: {}", e),
Err(e) => error!("error while processing update: {}", e),
}
}
// the ownership on the arc has been taken, we need to exit.
@ -253,21 +191,31 @@ impl UpdateStore {
.get(txn, &NextIdKey::Global)?
.map(U64::get)
.unwrap_or_default();
self.next_update_id
.put(txn, &NextIdKey::Global, &BEU64::new(global_id + 1))?;
let update_id = self.next_update_id_raw(txn, index_uuid)?;
Ok((global_id, update_id))
}
/// Returns the next next update id for a given `index_uuid` without
/// incrementing the global update id. This is useful for the dumps.
fn next_update_id_raw(&self, txn: &mut heed::RwTxn, index_uuid: Uuid) -> heed::Result<u64> {
let update_id = self
.next_update_id
.get(txn, &NextIdKey::Index(index_uuid))?
.map(U64::get)
.unwrap_or_default();
self.next_update_id
.put(txn, &NextIdKey::Global, &BEU64::new(global_id + 1))?;
self.next_update_id.put(
txn,
&NextIdKey::Index(index_uuid),
&BEU64::new(update_id + 1),
)?;
Ok((global_id, update_id))
Ok(update_id)
}
/// Registers the update content in the pending store and the meta
@ -275,13 +223,13 @@ impl UpdateStore {
pub fn register_update(
&self,
meta: UpdateMeta,
content: Option<impl AsRef<Path>>,
content: Option<Uuid>,
index_uuid: Uuid,
) -> heed::Result<Enqueued> {
let mut txn = self.env.write_txn()?;
let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?;
let meta = Enqueued::new(meta, update_id, content.map(|p| p.as_ref().to_owned()));
let meta = Enqueued::new(meta, update_id, content);
self.pending_queue
.put(&mut txn, &(global_id, index_uuid, update_id), &meta)?;
@ -294,6 +242,35 @@ impl UpdateStore {
Ok(meta)
}
/// Push already processed update in the UpdateStore without triggering the notification
/// process. This is useful for the dumps.
pub fn register_raw_updates(
&self,
wtxn: &mut heed::RwTxn,
update: &UpdateStatus,
index_uuid: Uuid,
) -> heed::Result<()> {
match update {
UpdateStatus::Enqueued(enqueued) => {
let (global_id, _update_id) = self.next_update_id(wtxn, index_uuid)?;
self.pending_queue.remap_key_type::<PendingKeyCodec>().put(
wtxn,
&(global_id, index_uuid, enqueued.id()),
&enqueued,
)?;
}
_ => {
let _update_id = self.next_update_id_raw(wtxn, index_uuid)?;
self.updates.remap_key_type::<UpdateKeyCodec>().put(
wtxn,
&(index_uuid, update.id()),
&update,
)?;
}
}
Ok(())
}
/// Executes the user provided function on the next pending update (the one with the lowest id).
/// This is asynchronous as it let the user process the update with a read-only txn and
/// only writing the result meta to the processed-meta store *after* it has been processed.
@ -314,13 +291,14 @@ impl UpdateStore {
let processing = pending.processing();
// Acquire the state lock and set the current state to processing.
// txn must *always* be acquired after state lock, or it will dead lock.
let state = self.state.write();
state.swap(State::Processing(index_uuid, processing.clone()));
let file = match content_path {
Some(ref path) => {
let file = File::open(path)
.with_context(|| format!("file at path: {:?}", &content_path))?;
Some(uuid) => {
let path = update_uuid_to_file_path(&self.path, uuid);
let file = File::open(path)?;
Some(file)
}
None => None,
@ -336,7 +314,8 @@ impl UpdateStore {
self.pending_queue
.delete(&mut wtxn, &(global_id, index_uuid, update_id))?;
if let Some(path) = content_path {
if let Some(uuid) = content_path {
let path = update_uuid_to_file_path(&self.path, uuid);
remove_file(&path)?;
}
@ -436,7 +415,7 @@ impl UpdateStore {
pub fn delete_all(&self, index_uuid: Uuid) -> anyhow::Result<()> {
let mut txn = self.env.write_txn()?;
// Contains all the content file paths that we need to be removed if the deletion was successful.
let mut paths_to_remove = Vec::new();
let mut uuids_to_remove = Vec::new();
let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data();
@ -444,8 +423,8 @@ impl UpdateStore {
if uuid == index_uuid {
pendings.del_current()?;
let mut pending = pending.decode()?;
if let Some(path) = pending.content.take() {
paths_to_remove.push(path);
if let Some(update_uuid) = pending.content.take() {
uuids_to_remove.push(update_uuid);
}
}
}
@ -465,9 +444,12 @@ impl UpdateStore {
txn.commit()?;
paths_to_remove.iter().for_each(|path| {
let _ = remove_file(path);
});
uuids_to_remove
.iter()
.map(|uuid| update_uuid_to_file_path(&self.path, *uuid))
.for_each(|path| {
let _ = remove_file(path);
});
// We don't care about the currently processing update, since it will be removed by itself
// once its done processing, and we can't abort a running update.
@ -496,7 +478,7 @@ impl UpdateStore {
// create db snapshot
self.env.copy_to_path(&db_path, CompactionOption::Enabled)?;
let update_files_path = update_path.join("update_files");
let update_files_path = update_path.join(UPDATE_DIR);
create_dir_all(&update_files_path)?;
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
@ -504,10 +486,13 @@ impl UpdateStore {
for entry in pendings {
let ((_, uuid, _), pending) = entry?;
if uuids.contains(&uuid) {
if let Some(path) = pending.decode()?.content_path() {
let name = path.file_name().unwrap();
let to = update_files_path.join(name);
copy(path, to)?;
if let Enqueued {
content: Some(uuid),
..
} = pending.decode()?
{
let path = update_uuid_to_file_path(&self.path, uuid);
copy(path, &update_files_path)?;
}
}
}
@ -533,14 +518,17 @@ impl UpdateStore {
pub fn get_info(&self) -> anyhow::Result<UpdateStoreInfo> {
let mut size = self.env.size();
let txn = self.env.read_txn()?;
for entry in self.pending_queue.iter(&txn)? {
let (_, pending) = entry?;
if let Some(path) = pending.content_path() {
if let Enqueued {
content: Some(uuid),
..
} = pending
{
let path = update_uuid_to_file_path(&self.path, uuid);
size += File::open(path)?.metadata()?.len();
}
}
let processing = match *self.state.read() {
State::Processing(uuid, _) => Some(uuid),
_ => None,
@ -550,6 +538,12 @@ impl UpdateStore {
}
}
fn update_uuid_to_file_path(root: impl AsRef<Path>, uuid: Uuid) -> PathBuf {
root.as_ref()
.join(UPDATE_DIR)
.join(format!("update_{}", uuid))
}
#[cfg(test)]
mod test {
use super::*;
@ -595,9 +589,7 @@ mod test {
let uuid = Uuid::new_v4();
let store_clone = update_store.clone();
tokio::task::spawn_blocking(move || {
store_clone
.register_update(meta, Some("here"), uuid)
.unwrap();
store_clone.register_update(meta, None, uuid).unwrap();
})
.await
.unwrap();

View File

@ -1,10 +1,9 @@
use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateFormat};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::index::{Checked, Settings};
use crate::index::{Unchecked, Settings};
pub type UpdateError = String;
@ -25,7 +24,7 @@ pub enum UpdateMeta {
},
ClearDocuments,
DeleteDocuments,
Settings(Settings<Checked>),
Settings(Settings<Unchecked>),
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -34,11 +33,11 @@ pub struct Enqueued {
pub update_id: u64,
pub meta: UpdateMeta,
pub enqueued_at: DateTime<Utc>,
pub content: Option<PathBuf>,
pub content: Option<Uuid>,
}
impl Enqueued {
pub fn new(meta: UpdateMeta, update_id: u64, content: Option<PathBuf>) -> Self {
pub fn new(meta: UpdateMeta, update_id: u64, content: Option<Uuid>) -> Self {
Self {
enqueued_at: Utc::now(),
meta,
@ -68,10 +67,6 @@ impl Enqueued {
pub fn id(&self) -> u64 {
self.update_id
}
pub fn content_path(&self) -> Option<&Path> {
self.content.as_deref()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -152,7 +147,7 @@ impl Failed {
}
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(tag = "status", rename_all = "camelCase")]
pub enum UpdateStatus {
Processing(Processing),

View File

@ -4,7 +4,7 @@ use log::{info, warn};
use tokio::sync::mpsc;
use uuid::Uuid;
use super::{Result, UuidError, UuidResolveMsg, UuidStore};
use super::{Result, UuidResolveMsg, UuidResolverError, UuidStore};
pub struct UuidResolverActor<S> {
inbox: mpsc::Receiver<UuidResolveMsg>,
@ -44,6 +44,9 @@ impl<S: UuidStore> UuidResolverActor<S> {
Some(GetSize { ret }) => {
let _ = ret.send(self.handle_get_size().await);
}
Some(DumpRequest { path, ret }) => {
let _ = ret.send(self.handle_dump(path).await);
}
// all senders have been dropped, need to quit.
None => break,
}
@ -54,7 +57,7 @@ impl<S: UuidStore> UuidResolverActor<S> {
async fn handle_create(&self, uid: String) -> Result<Uuid> {
if !is_index_uid_valid(&uid) {
return Err(UuidError::BadlyFormatted(uid));
return Err(UuidResolverError::BadlyFormatted(uid));
}
self.store.create_uuid(uid, true).await
}
@ -63,14 +66,14 @@ impl<S: UuidStore> UuidResolverActor<S> {
self.store
.get_uuid(uid.clone())
.await?
.ok_or(UuidError::UnexistingIndex(uid))
.ok_or(UuidResolverError::UnexistingIndex(uid))
}
async fn handle_delete(&self, uid: String) -> Result<Uuid> {
self.store
.delete(uid.clone())
.await?
.ok_or(UuidError::UnexistingIndex(uid))
.ok_or(UuidResolverError::UnexistingIndex(uid))
}
async fn handle_list(&self) -> Result<Vec<(String, Uuid)>> {
@ -82,9 +85,13 @@ impl<S: UuidStore> UuidResolverActor<S> {
self.store.snapshot(path).await
}
async fn handle_dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
self.store.dump(path).await
}
async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> {
if !is_index_uid_valid(&uid) {
return Err(UuidError::BadlyFormatted(uid));
return Err(UuidResolverError::BadlyFormatted(uid));
}
self.store.insert(uid, uuid).await?;
Ok(())

View File

@ -85,4 +85,12 @@ impl UuidResolverHandle for UuidResolverHandleImpl {
.await
.expect("Uuid resolver actor has been killed")?)
}
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::DumpRequest { ret, path };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
}

View File

@ -34,4 +34,8 @@ pub enum UuidResolveMsg {
GetSize {
ret: oneshot::Sender<Result<u64>>,
},
DumpRequest {
path: PathBuf,
ret: oneshot::Sender<Result<HashSet<Uuid>>>,
},
}

View File

@ -1,7 +1,7 @@
mod actor;
mod handle_impl;
mod message;
mod store;
pub mod store;
use std::collections::HashSet;
use std::path::PathBuf;
@ -11,16 +11,17 @@ use uuid::Uuid;
use actor::UuidResolverActor;
use message::UuidResolveMsg;
use store::{HeedUuidStore, UuidStore};
use store::UuidStore;
#[cfg(test)]
use mockall::automock;
pub use handle_impl::UuidResolverHandleImpl;
pub use store::HeedUuidStore;
const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB
pub type Result<T> = std::result::Result<T, UuidError>;
pub type Result<T> = std::result::Result<T, UuidResolverError>;
#[async_trait::async_trait]
#[cfg_attr(test, automock)]
@ -32,20 +33,37 @@ pub trait UuidResolverHandle {
async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>>;
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
async fn get_size(&self) -> Result<u64>;
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
}
#[derive(Debug, Error)]
pub enum UuidError {
pub enum UuidResolverError {
#[error("Name already exist.")]
NameAlreadyExist,
#[error("Index \"{0}\" doesn't exist.")]
UnexistingIndex(String),
#[error("Error performing task: {0}")]
TokioTask(#[from] tokio::task::JoinError),
#[error("Database error: {0}")]
Heed(#[from] heed::Error),
#[error("Uuid error: {0}")]
Uuid(#[from] uuid::Error),
#[error("Badly formatted index uid: {0}")]
BadlyFormatted(String),
#[error("Internal error resolving index uid: {0}")]
Internal(String),
}
macro_rules! internal_error {
($($other:path), *) => {
$(
impl From<$other> for UuidResolverError {
fn from(other: $other) -> Self {
Self::Internal(other.to_string())
}
}
)*
}
}
internal_error!(
heed::Error,
uuid::Error,
std::io::Error,
tokio::task::JoinError,
serde_json::Error
);

View File

@ -1,18 +1,26 @@
use std::path::{Path, PathBuf};
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::fs::{create_dir_all, File};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use heed::{
types::{ByteSlice, Str},
CompactionOption, Database, Env, EnvOpenOptions,
};
use heed::types::{ByteSlice, Str};
use heed::{CompactionOption, Database, Env, EnvOpenOptions};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::{Result, UuidError, UUID_STORE_SIZE};
use super::{Result, UuidResolverError, UUID_STORE_SIZE};
use crate::helpers::EnvSizer;
#[derive(Serialize, Deserialize)]
struct DumpEntry {
uuid: Uuid,
uid: String,
}
const UUIDS_DB_PATH: &str = "index_uuids";
#[async_trait::async_trait]
pub trait UuidStore {
pub trait UuidStore: Sized {
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return
// the uuid otherwise.
async fn create_uuid(&self, uid: String, err: bool) -> Result<Uuid>;
@ -22,8 +30,10 @@ pub trait UuidStore {
async fn insert(&self, name: String, uuid: Uuid) -> Result<()>;
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
async fn get_size(&self) -> Result<u64>;
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
}
#[derive(Clone)]
pub struct HeedUuidStore {
env: Env,
db: Database<Str, ByteSlice>,
@ -31,7 +41,7 @@ pub struct HeedUuidStore {
impl HeedUuidStore {
pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let path = path.as_ref().join("index_uuids");
let path = path.as_ref().join(UUIDS_DB_PATH);
create_dir_all(&path)?;
let mut options = EnvOpenOptions::new();
options.map_size(UUID_STORE_SIZE); // 1GB
@ -39,123 +49,198 @@ impl HeedUuidStore {
let db = env.create_database(None)?;
Ok(Self { env, db })
}
pub fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> {
let env = self.env.clone();
let db = self.db;
let mut txn = env.write_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
if err {
Err(UuidResolverError::NameAlreadyExist)
} else {
let uuid = Uuid::from_slice(uuid)?;
Ok(uuid)
}
}
None => {
let uuid = Uuid::new_v4();
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(uuid)
}
}
}
pub fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
let env = self.env.clone();
let db = self.db;
let txn = env.read_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
let uuid = Uuid::from_slice(uuid)?;
Ok(Some(uuid))
}
None => Ok(None),
}
}
pub fn delete(&self, uid: String) -> Result<Option<Uuid>> {
let env = self.env.clone();
let db = self.db;
let mut txn = env.write_txn()?;
match db.get(&txn, &uid)? {
Some(uuid) => {
let uuid = Uuid::from_slice(uuid)?;
db.delete(&mut txn, &uid)?;
txn.commit()?;
Ok(Some(uuid))
}
None => Ok(None),
}
}
pub fn list(&self) -> Result<Vec<(String, Uuid)>> {
let env = self.env.clone();
let db = self.db;
let txn = env.read_txn()?;
let mut entries = Vec::new();
for entry in db.iter(&txn)? {
let (name, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.push((name.to_owned(), uuid))
}
Ok(entries)
}
pub fn insert(&self, name: String, uuid: Uuid) -> Result<()> {
let env = self.env.clone();
let db = self.db;
let mut txn = env.write_txn()?;
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(())
}
pub fn snapshot(&self, mut path: PathBuf) -> Result<HashSet<Uuid>> {
let env = self.env.clone();
let db = self.db;
// Write transaction to acquire a lock on the database.
let txn = env.write_txn()?;
let mut entries = HashSet::new();
for entry in db.iter(&txn)? {
let (_, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.insert(uuid);
}
// only perform snapshot if there are indexes
if !entries.is_empty() {
path.push(UUIDS_DB_PATH);
create_dir_all(&path).unwrap();
path.push("data.mdb");
env.copy_to_path(path, CompactionOption::Enabled)?;
}
Ok(entries)
}
pub fn get_size(&self) -> Result<u64> {
Ok(self.env.size())
}
pub fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
let dump_path = path.join(UUIDS_DB_PATH);
create_dir_all(&dump_path)?;
let dump_file_path = dump_path.join("data.jsonl");
let mut dump_file = File::create(&dump_file_path)?;
let mut uuids = HashSet::new();
let txn = self.env.read_txn()?;
for entry in self.db.iter(&txn)? {
let (uid, uuid) = entry?;
let uid = uid.to_string();
let uuid = Uuid::from_slice(uuid)?;
let entry = DumpEntry { uuid, uid };
serde_json::to_writer(&mut dump_file, &entry)?;
dump_file.write_all(b"\n").unwrap();
uuids.insert(uuid);
}
Ok(uuids)
}
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
let uuid_resolver_path = dst.as_ref().join(UUIDS_DB_PATH);
std::fs::create_dir_all(&uuid_resolver_path)?;
let src_indexes = src.as_ref().join(UUIDS_DB_PATH).join("data.jsonl");
let indexes = File::open(&src_indexes)?;
let mut indexes = BufReader::new(indexes);
let mut line = String::new();
let db = Self::new(dst)?;
let mut txn = db.env.write_txn()?;
loop {
match indexes.read_line(&mut line) {
Ok(0) => break,
Ok(_) => {
let DumpEntry { uuid, uid } = serde_json::from_str(&line)?;
println!("importing {} {}", uid, uuid);
db.db.put(&mut txn, &uid, uuid.as_bytes())?;
}
Err(e) => return Err(e.into()),
}
line.clear();
}
txn.commit()?;
db.env.prepare_for_closing().wait();
Ok(())
}
}
#[async_trait::async_trait]
impl UuidStore for HeedUuidStore {
async fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
let mut txn = env.write_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
if err {
Err(UuidError::NameAlreadyExist)
} else {
let uuid = Uuid::from_slice(uuid)?;
Ok(uuid)
}
}
None => {
let uuid = Uuid::new_v4();
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(uuid)
}
}
})
.await?
let this = self.clone();
tokio::task::spawn_blocking(move || this.create_uuid(name, err)).await?
}
async fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
let txn = env.read_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
let uuid = Uuid::from_slice(uuid)?;
Ok(Some(uuid))
}
None => Ok(None),
}
})
.await?
let this = self.clone();
tokio::task::spawn_blocking(move || this.get_uuid(name)).await?
}
async fn delete(&self, uid: String) -> Result<Option<Uuid>> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
let mut txn = env.write_txn()?;
match db.get(&txn, &uid)? {
Some(uuid) => {
let uuid = Uuid::from_slice(uuid)?;
db.delete(&mut txn, &uid)?;
txn.commit()?;
Ok(Some(uuid))
}
None => Ok(None),
}
})
.await?
let this = self.clone();
tokio::task::spawn_blocking(move || this.delete(uid)).await?
}
async fn list(&self) -> Result<Vec<(String, Uuid)>> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
let txn = env.read_txn()?;
let mut entries = Vec::new();
for entry in db.iter(&txn)? {
let (name, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.push((name.to_owned(), uuid))
}
Ok(entries)
})
.await?
let this = self.clone();
tokio::task::spawn_blocking(move || this.list()).await?
}
async fn insert(&self, name: String, uuid: Uuid) -> Result<()> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
let mut txn = env.write_txn()?;
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(())
})
.await?
let this = self.clone();
tokio::task::spawn_blocking(move || this.insert(name, uuid)).await?
}
async fn snapshot(&self, mut path: PathBuf) -> Result<HashSet<Uuid>> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
// Write transaction to acquire a lock on the database.
let txn = env.write_txn()?;
let mut entries = HashSet::new();
for entry in db.iter(&txn)? {
let (_, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.insert(uuid);
}
// only perform snapshot if there are indexes
if !entries.is_empty() {
path.push("index_uuids");
create_dir_all(&path).unwrap();
path.push("data.mdb");
env.copy_to_path(path, CompactionOption::Enabled)?;
}
Ok(entries)
})
.await?
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.snapshot(path)).await?
}
async fn get_size(&self) -> Result<u64> {
Ok(self.env.size())
self.get_size()
}
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.dump(path)).await?
}
}

View File

@ -46,8 +46,8 @@ macro_rules! create_app {
.configure(synonym::services)
.configure(health::services)
.configure(stats::services)
.configure(key::services);
//.configure(routes::dump::services);
.configure(key::services)
.configure(dump::services);
#[cfg(feature = "mini-dashboard")]
let app = if $enable_frontend {
let generated = dashboard::generate();
@ -62,11 +62,11 @@ macro_rules! create_app {
app.wrap(
Cors::default()
.send_wildcard()
.allowed_headers(vec!["content-type", "x-meili-api-key"])
.allow_any_origin()
.allow_any_method()
.max_age(86_400) // 24h
.send_wildcard()
.allowed_headers(vec!["content-type", "x-meili-api-key"])
.allow_any_origin()
.allow_any_method()
.max_age(86_400), // 24h
)
.wrap(middleware::Logger::default())
.wrap(middleware::Compress::default())

View File

@ -202,10 +202,6 @@ pub struct Opt {
#[structopt(long, conflicts_with = "import-snapshot")]
pub import_dump: Option<PathBuf>,
/// The batch size used in the importation process, the bigger it is the faster the dump is created.
#[structopt(long, env = "MEILI_DUMP_BATCH_SIZE", default_value = "1024")]
pub dump_batch_size: usize,
#[structopt(flatten)]
pub indexer_options: IndexerOpts,
}

View File

@ -1,25 +1,20 @@
use std::fs::File;
use std::path::Path;
use actix_web::{get, post};
use actix_web::{HttpResponse, web};
use actix_web::HttpResponse;
use actix_web::{get, post, web};
use serde::{Deserialize, Serialize};
use crate::dump::{DumpInfo, DumpStatus, compressed_dumps_dir, init_dump_process};
use crate::Data;
use crate::error::{Error, ResponseError};
use crate::error::ResponseError;
use crate::helpers::Authentication;
use crate::Data;
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(trigger_dump)
.service(get_dump_status);
cfg.service(create_dump).service(get_dump_status);
}
#[post("/dumps", wrap = "Authentication::Private")]
async fn trigger_dump(
data: web::Data<Data>,
) -> Result<HttpResponse, ResponseError> {
todo!()
async fn create_dump(data: web::Data<Data>) -> Result<HttpResponse, ResponseError> {
let res = data.create_dump().await?;
Ok(HttpResponse::Accepted().json(res))
}
#[derive(Debug, Serialize)]
@ -38,5 +33,7 @@ async fn get_dump_status(
data: web::Data<Data>,
path: web::Path<DumpParam>,
) -> Result<HttpResponse, ResponseError> {
todo!()
let res = data.dump_status(path.dump_uid.clone()).await?;
Ok(HttpResponse::Ok().json(res))
}

View File

@ -1,6 +1,7 @@
use actix_web::{delete, get, post, put};
use actix_web::{web, HttpResponse};
use serde::Deserialize;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::error::ResponseError;
use crate::helpers::Authentication;
@ -68,6 +69,16 @@ struct UpdateIndexRequest {
primary_key: Option<String>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct UpdateIndexResponse {
name: String,
uid: String,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
primary_key: Option<String>,
}
#[put("/indexes/{index_uid}", wrap = "Authentication::Private")]
async fn update_index(
data: web::Data<Data>,

View File

@ -2,6 +2,7 @@ use actix_web::{get, HttpResponse};
use serde::{Deserialize, Serialize};
pub mod document;
pub mod dump;
pub mod health;
pub mod index;
pub mod key;
@ -9,7 +10,6 @@ pub mod search;
pub mod settings;
pub mod stats;
pub mod synonym;
//pub mod dump;
#[derive(Deserialize)]
pub struct IndexParam {

View File

@ -1,9 +1,9 @@
use actix_web::{delete, get, post, web, HttpResponse};
use crate::{error::ResponseError, index::Unchecked};
use crate::helpers::Authentication;
use crate::index::Settings;
use crate::Data;
use crate::{error::ResponseError, index::Unchecked};
#[macro_export]
macro_rules! make_setting_route {

View File

@ -47,7 +47,7 @@ impl Index<'_> {
update_id as u64
}
pub async fn create(& self, primary_key: Option<&str>) -> (Value, StatusCode) {
pub async fn create(&self, primary_key: Option<&str>) -> (Value, StatusCode) {
let body = json!({
"uid": self.uid,
"primaryKey": primary_key,

View File

@ -44,7 +44,7 @@ impl Server {
}
/// Returns a view to an index. There is no guarantee that the index exists.
pub fn index(& self, uid: impl AsRef<str>) -> Index<'_> {
pub fn index(&self, uid: impl AsRef<str>) -> Index<'_> {
Index {
uid: encode(uid.as_ref()),
service: &self.service,
@ -68,7 +68,6 @@ pub fn default_settings(dir: impl AsRef<Path>) -> Opt {
Opt {
db_path: dir.as_ref().join("db"),
dumps_dir: dir.as_ref().join("dump"),
dump_batch_size: 16,
http_addr: "127.0.0.1:7700".to_owned(),
master_key: None,
env: "development".to_owned(),

View File

@ -73,7 +73,7 @@ async fn reset_all_settings() {
let server = Server::new().await;
let index = server.index("test");
index
.update_settings(json!({"displayedAttributes": ["foo"], "searchableAttributes": ["bar"], "stopWords": ["the"] }))
.update_settings(json!({"displayedAttributes": ["foo"], "searchableAttributes": ["bar"], "stopWords": ["the"], "attributesForFaceting": { "toto": "string" } }))
.await;
index.wait_update_id(0).await;
let (response, code) = index.settings().await;
@ -81,6 +81,7 @@ async fn reset_all_settings() {
assert_eq!(response["displayedAttributes"], json!(["foo"]));
assert_eq!(response["searchableAttributes"], json!(["bar"]));
assert_eq!(response["stopWords"], json!(["the"]));
assert_eq!(response["attributesForFaceting"], json!({"toto": "string"}));
index.delete_settings().await;
index.wait_update_id(1).await;
@ -90,6 +91,7 @@ async fn reset_all_settings() {
assert_eq!(response["displayedAttributes"], json!(["*"]));
assert_eq!(response["searchableAttributes"], json!(["*"]));
assert_eq!(response["stopWords"], json!([]));
assert_eq!(response["attributesForFaceting"], json!({}));
}
#[actix_rt::test]