refactor meilisearch

This commit is contained in:
mpostma 2021-09-14 18:39:02 +02:00
parent 6fafdb7711
commit e14640e530
33 changed files with 1222 additions and 1166 deletions

104
Cargo.lock generated
View File

@ -235,6 +235,15 @@ dependencies = [
"path-slash", "path-slash",
] ]
[[package]]
name = "addr2line"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e61f2b7f93d2c7d2b08263acaa4a363b3e276806c68af6134c44f523bf1aacd"
dependencies = [
"gimli",
]
[[package]] [[package]]
name = "adler" name = "adler"
version = "1.0.2" version = "1.0.2"
@ -281,6 +290,9 @@ name = "anyhow"
version = "1.0.44" version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1" checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1"
dependencies = [
"backtrace",
]
[[package]] [[package]]
name = "arc-swap" name = "arc-swap"
@ -346,6 +358,21 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "backtrace"
version = "0.3.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7a905d892734eea339e896738c14b9afce22b5318f64b951e70bf3844419b01"
dependencies = [
"addr2line",
"cc",
"cfg-if 1.0.0",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]] [[package]]
name = "base-x" name = "base-x"
version = "0.2.8" version = "0.2.8"
@ -358,6 +385,15 @@ version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bimap"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50ae17cabbc8a38a1e3e4c1a6a664e9a09672dc14d0896fa8d865d3a5a446b07"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "bincode" name = "bincode"
version = "1.3.3" version = "1.3.3"
@ -432,7 +468,6 @@ dependencies = [
"lazy_static", "lazy_static",
"memchr", "memchr",
"regex-automata", "regex-automata",
"serde",
] ]
[[package]] [[package]]
@ -734,28 +769,6 @@ dependencies = [
"lazy_static", "lazy_static",
] ]
[[package]]
name = "csv"
version = "1.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1"
dependencies = [
"bstr",
"csv-core",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "csv-core"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "derive_more" name = "derive_more"
version = "0.99.16" version = "0.99.16"
@ -1089,6 +1102,12 @@ dependencies = [
"syn 1.0.76", "syn 1.0.76",
] ]
[[package]]
name = "gimli"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7"
[[package]] [[package]]
name = "git2" name = "git2"
version = "0.13.22" version = "0.13.22"
@ -1618,6 +1637,7 @@ dependencies = [
"tempfile", "tempfile",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-stream",
"urlencoding", "urlencoding",
"uuid", "uuid",
"vergen", "vergen",
@ -1670,14 +1690,15 @@ dependencies = [
[[package]] [[package]]
name = "milli" name = "milli"
version = "0.13.1" version = "0.13.1"
source = "git+https://github.com/meilisearch/milli.git?tag=v0.13.1#90d64d257fa944ab2ee1572193e501bb231627c7" source = "git+https://github.com/meilisearch/milli.git?rev=6de1b41#6de1b41f791e7d117634e63783d78b29b5228a99"
dependencies = [ dependencies = [
"bimap",
"bincode",
"bstr", "bstr",
"byteorder", "byteorder",
"chrono", "chrono",
"concat-arrays", "concat-arrays",
"crossbeam-channel", "crossbeam-channel",
"csv",
"either", "either",
"flate2", "flate2",
"fst", "fst",
@ -1706,6 +1727,7 @@ dependencies = [
"smallvec", "smallvec",
"tempfile", "tempfile",
"uuid", "uuid",
"vec-utils",
] ]
[[package]] [[package]]
@ -1827,6 +1849,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "object"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39f37e50073ccad23b6d09bcb5b263f4e76d3bb6038e4a3c08e52162ffa8abc2"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "obkv" name = "obkv"
version = "0.2.0" version = "0.2.0"
@ -2367,6 +2398,12 @@ dependencies = [
"retain_mut", "retain_mut",
] ]
[[package]]
name = "rustc-demangle"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
[[package]] [[package]]
name = "rustc_version" name = "rustc_version"
version = "0.2.3" version = "0.2.3"
@ -2959,6 +2996,17 @@ dependencies = [
"webpki", "webpki",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2f3f698253f03119ac0102beaa64f67a67e08074d03a22d18784104543727f"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.6.8" version = "0.6.8"
@ -3126,6 +3174,12 @@ version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "vec-utils"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dac984aa016c26ef4ed7b2c30d6a1bd570fd40a078caccaf6415a2ac5d96161"
[[package]] [[package]]
name = "vec_map" name = "vec_map"
version = "0.8.2" version = "0.8.2"

View File

@ -25,7 +25,7 @@ zip = { version = "0.5.13", optional = true }
actix-cors = { git = "https://github.com/MarinPostma/actix-extras.git", rev = "963ac94d" } actix-cors = { git = "https://github.com/MarinPostma/actix-extras.git", rev = "963ac94d" }
actix-web = { version = "4.0.0-beta.9", features = ["rustls"] } actix-web = { version = "4.0.0-beta.9", features = ["rustls"] }
actix-web-static-files = { git = "https://github.com/MarinPostma/actix-web-static-files.git", rev = "39d8006", optional = true } actix-web-static-files = { git = "https://github.com/MarinPostma/actix-web-static-files.git", rev = "39d8006", optional = true }
anyhow = "1.0.43" anyhow = { version = "1.0.43", features = ["backtrace"] }
async-stream = "0.3.2" async-stream = "0.3.2"
async-trait = "0.1.51" async-trait = "0.1.51"
arc-swap = "1.3.2" arc-swap = "1.3.2"
@ -48,7 +48,7 @@ main_error = "0.1.1"
meilisearch-error = { path = "../meilisearch-error" } meilisearch-error = { path = "../meilisearch-error" }
meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" } meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" }
memmap = "0.7.0" memmap = "0.7.0"
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.13.1" } milli = { git = "https://github.com/meilisearch/milli.git", rev = "6de1b41" }
mime = "0.3.16" mime = "0.3.16"
num_cpus = "1.13.0" num_cpus = "1.13.0"
once_cell = "1.8.0" once_cell = "1.8.0"
@ -75,6 +75,7 @@ whoami = { version = "1.1.3", optional = true }
reqwest = { version = "0.11.4", features = ["json", "rustls-tls"], default-features = false, optional = true } reqwest = { version = "0.11.4", features = ["json", "rustls-tls"], default-features = false, optional = true }
serdeval = "0.1.0" serdeval = "0.1.0"
sysinfo = "0.20.2" sysinfo = "0.20.2"
tokio-stream = "0.1.7"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.2.0" actix-rt = "2.2.0"

View File

@ -5,7 +5,7 @@ use sha2::Digest;
use crate::index::{Checked, Settings}; use crate::index::{Checked, Settings};
use crate::index_controller::{ use crate::index_controller::{
error::Result, DumpInfo, IndexController, IndexMetadata, IndexSettings, IndexStats, Stats, error::Result, DumpInfo, IndexController, IndexMetadata, IndexStats, Stats,
}; };
use crate::option::Opt; use crate::option::Opt;
@ -91,19 +91,19 @@ impl Data {
self.index_controller.get_index(uid).await self.index_controller.get_index(uid).await
} }
pub async fn create_index( //pub async fn create_index(
&self, //&self,
uid: String, //uid: String,
primary_key: Option<String>, //primary_key: Option<String>,
) -> Result<IndexMetadata> { //) -> Result<IndexMetadata> {
let settings = IndexSettings { //let settings = IndexSettings {
uid: Some(uid), //uid: Some(uid),
primary_key, //primary_key,
}; //};
let meta = self.index_controller.create_index(settings).await?; //let meta = self.index_controller.create_index(settings).await?;
Ok(meta) //Ok(meta)
} //}
pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> { pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> {
Ok(self.index_controller.get_index_stats(uid).await?) Ok(self.index_controller.get_index_stats(uid).await?)

View File

@ -1,59 +1,11 @@
use milli::update::{IndexDocumentsMethod, UpdateFormat}; use crate::index_controller::Update;
use crate::extractors::payload::Payload;
use crate::index::{Checked, Settings};
use crate::index_controller::{error::Result, IndexMetadata, IndexSettings, UpdateStatus}; use crate::index_controller::{error::Result, IndexMetadata, IndexSettings, UpdateStatus};
use crate::Data; use crate::Data;
impl Data { impl Data {
pub async fn add_documents( pub async fn register_update(&self, index_uid: &str, update: Update) -> Result<UpdateStatus> {
&self, let status = self.index_controller.register_update(index_uid, update).await?;
index: String, Ok(status)
method: IndexDocumentsMethod,
format: UpdateFormat,
stream: Payload,
primary_key: Option<String>,
) -> Result<UpdateStatus> {
let update_status = self
.index_controller
.add_documents(index, method, format, stream, primary_key)
.await?;
Ok(update_status)
}
pub async fn update_settings(
&self,
index: String,
settings: Settings<Checked>,
create: bool,
) -> Result<UpdateStatus> {
let update = self
.index_controller
.update_settings(index, settings, create)
.await?;
Ok(update)
}
pub async fn clear_documents(&self, index: String) -> Result<UpdateStatus> {
let update = self.index_controller.clear_documents(index).await?;
Ok(update)
}
pub async fn delete_documents(
&self,
index: String,
document_ids: Vec<String>,
) -> Result<UpdateStatus> {
let update = self
.index_controller
.delete_documents(index, document_ids)
.await?;
Ok(update)
}
pub async fn delete_index(&self, index: String) -> Result<()> {
self.index_controller.delete_index(index).await?;
Ok(())
} }
pub async fn get_update_status(&self, index: String, uid: u64) -> Result<UpdateStatus> { pub async fn get_update_status(&self, index: String, uid: u64) -> Result<UpdateStatus> {

View File

@ -86,7 +86,6 @@ impl ErrorCode for MilliError<'_> {
milli::Error::UserError(ref error) => { milli::Error::UserError(ref error) => {
match error { match error {
// TODO: wait for spec for new error codes. // TODO: wait for spec for new error codes.
UserError::Csv(_)
| UserError::SerdeJson(_) | UserError::SerdeJson(_)
| UserError::MaxDatabaseSizeReached | UserError::MaxDatabaseSizeReached
| UserError::InvalidCriterionName { .. } | UserError::InvalidCriterionName { .. }

View File

@ -1,20 +1,15 @@
use std::fs::{create_dir_all, File}; use std::fs::File;
use std::io::{BufRead, BufReader, Write}; use std::io::Write;
use std::path::Path; use std::path::Path;
use std::sync::Arc;
use anyhow::{bail, Context};
use heed::RoTxn; use heed::RoTxn;
use indexmap::IndexMap; use indexmap::IndexMap;
use milli::update::{IndexDocumentsMethod, UpdateFormat::JsonStream};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::index_controller::{asc_ranking_rule, desc_ranking_rule};
use crate::option::IndexerOpts; use crate::option::IndexerOpts;
use super::error::Result; use super::error::Result;
use super::{update_handler::UpdateHandler, Index, Settings, Unchecked}; use super::{Index, Settings, Unchecked};
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct DumpMeta { struct DumpMeta {
@ -80,91 +75,92 @@ impl Index {
} }
pub fn load_dump( pub fn load_dump(
src: impl AsRef<Path>, _src: impl AsRef<Path>,
dst: impl AsRef<Path>, _dst: impl AsRef<Path>,
size: usize, _size: usize,
indexing_options: &IndexerOpts, _indexing_options: &IndexerOpts,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let dir_name = src //let dir_name = src
.as_ref() //.as_ref()
.file_name() //.file_name()
.with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?; //.with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?;
let dst_dir_path = dst.as_ref().join("indexes").join(dir_name); //let dst_dir_path = dst.as_ref().join("indexes").join(dir_name);
create_dir_all(&dst_dir_path)?; //create_dir_all(&dst_dir_path)?;
let meta_path = src.as_ref().join(META_FILE_NAME); //let meta_path = src.as_ref().join(META_FILE_NAME);
let mut meta_file = File::open(meta_path)?; //let mut meta_file = File::open(meta_path)?;
// We first deserialize the dump meta into a serde_json::Value and change //// We first deserialize the dump meta into a serde_json::Value and change
// the custom ranking rules settings from the old format to the new format. //// the custom ranking rules settings from the old format to the new format.
let mut meta: Value = serde_json::from_reader(&mut meta_file)?; //let mut meta: Value = serde_json::from_reader(&mut meta_file)?;
if let Some(ranking_rules) = meta.pointer_mut("/settings/rankingRules") { //if let Some(ranking_rules) = meta.pointer_mut("/settings/rankingRules") {
convert_custom_ranking_rules(ranking_rules); //convert_custom_ranking_rules(ranking_rules);
} //}
// Then we serialize it back into a vec to deserialize it //// Then we serialize it back into a vec to deserialize it
// into a `DumpMeta` struct with the newly patched `rankingRules` format. //// into a `DumpMeta` struct with the newly patched `rankingRules` format.
let patched_meta = serde_json::to_vec(&meta)?; //let patched_meta = serde_json::to_vec(&meta)?;
let DumpMeta { //let DumpMeta {
settings, //settings,
primary_key, //primary_key,
} = serde_json::from_slice(&patched_meta)?; //} = serde_json::from_slice(&patched_meta)?;
let settings = settings.check(); //let settings = settings.check();
let index = Self::open(&dst_dir_path, size)?; //let index = Self::open(&dst_dir_path, size)?;
let mut txn = index.write_txn()?; //let mut txn = index.write_txn()?;
let handler = UpdateHandler::new(indexing_options)?; //let handler = UpdateHandler::new(indexing_options)?;
index.update_settings_txn(&mut txn, &settings, handler.update_builder(0))?; //index.update_settings_txn(&mut txn, &settings, handler.update_builder(0))?;
let document_file_path = src.as_ref().join(DATA_FILE_NAME); //let document_file_path = src.as_ref().join(DATA_FILE_NAME);
let reader = File::open(&document_file_path)?; //let reader = File::open(&document_file_path)?;
let mut reader = BufReader::new(reader); //let mut reader = BufReader::new(reader);
reader.fill_buf()?; //reader.fill_buf()?;
// If the document file is empty, we don't perform the document addition, to prevent // If the document file is empty, we don't perform the document addition, to prevent
// a primary key error to be thrown. // a primary key error to be thrown.
if !reader.buffer().is_empty() {
index.update_documents_txn(
&mut txn,
JsonStream,
IndexDocumentsMethod::UpdateDocuments,
Some(reader),
handler.update_builder(0),
primary_key.as_deref(),
)?;
}
txn.commit()?; todo!("fix obk document dumps")
//if !reader.buffer().is_empty() {
//index.update_documents_txn(
//&mut txn,
//IndexDocumentsMethod::UpdateDocuments,
//Some(reader),
//handler.update_builder(0),
//primary_key.as_deref(),
//)?;
//}
match Arc::try_unwrap(index.0) { //txn.commit()?;
Ok(inner) => inner.prepare_for_closing().wait(),
Err(_) => bail!("Could not close index properly."),
}
Ok(()) //match Arc::try_unwrap(index.0) {
//Ok(inner) => inner.prepare_for_closing().wait(),
//Err(_) => bail!("Could not close index properly."),
//}
//Ok(())
} }
} }
/// Converts the ranking rules from the format `asc(_)`, `desc(_)` to the format `_:asc`, `_:desc`. // /// Converts the ranking rules from the format `asc(_)`, `desc(_)` to the format `_:asc`, `_:desc`.
/// // ///
/// This is done for compatibility reasons, and to avoid a new dump version, // /// This is done for compatibility reasons, and to avoid a new dump version,
/// since the new syntax was introduced soon after the new dump version. // /// since the new syntax was introduced soon after the new dump version.
fn convert_custom_ranking_rules(ranking_rules: &mut Value) { //fn convert_custom_ranking_rules(ranking_rules: &mut Value) {
*ranking_rules = match ranking_rules.take() { //*ranking_rules = match ranking_rules.take() {
Value::Array(values) => values //Value::Array(values) => values
.into_iter() //.into_iter()
.filter_map(|value| match value { //.filter_map(|value| match value {
Value::String(s) if s.starts_with("asc") => asc_ranking_rule(&s) //Value::String(s) if s.starts_with("asc") => asc_ranking_rule(&s)
.map(|f| format!("{}:asc", f)) //.map(|f| format!("{}:asc", f))
.map(Value::String), //.map(Value::String),
Value::String(s) if s.starts_with("desc") => desc_ranking_rule(&s) //Value::String(s) if s.starts_with("desc") => desc_ranking_rule(&s)
.map(|f| format!("{}:desc", f)) //.map(|f| format!("{}:desc", f))
.map(Value::String), //.map(Value::String),
otherwise => Some(otherwise), //otherwise => Some(otherwise),
}) //})
.collect(), //.collect(),
otherwise => otherwise, //otherwise => otherwise,
} //}
} //}

View File

@ -15,6 +15,7 @@ pub use search::{default_crop_length, SearchQuery, SearchResult, DEFAULT_SEARCH_
pub use updates::{Checked, Facets, Settings, Unchecked}; pub use updates::{Checked, Facets, Settings, Unchecked};
use crate::helpers::EnvSizer; use crate::helpers::EnvSizer;
use crate::index_controller::update_file_store::UpdateFileStore;
use self::error::IndexError; use self::error::IndexError;
@ -28,23 +29,26 @@ mod updates;
pub type Document = Map<String, Value>; pub type Document = Map<String, Value>;
#[derive(Clone)] #[derive(Clone)]
pub struct Index(pub Arc<milli::Index>); pub struct Index {
pub inner: Arc<milli::Index>,
update_file_store: Arc<UpdateFileStore>,
}
impl Deref for Index { impl Deref for Index {
type Target = milli::Index; type Target = milli::Index;
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
self.0.as_ref() self.inner.as_ref()
} }
} }
impl Index { impl Index {
pub fn open(path: impl AsRef<Path>, size: usize) -> Result<Self> { pub fn open(path: impl AsRef<Path>, size: usize, update_file_store: Arc<UpdateFileStore>) -> Result<Self> {
create_dir_all(&path)?; create_dir_all(&path)?;
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(size); options.map_size(size);
let index = milli::Index::new(options, &path)?; let inner = Arc::new(milli::Index::new(options, &path)?);
Ok(Index(Arc::new(index))) Ok(Index { inner, update_file_store })
} }
pub fn settings(&self) -> Result<Settings<Checked>> { pub fn settings(&self) -> Result<Settings<Checked>> {

View File

@ -662,7 +662,7 @@ fn parse_filter_array(
} }
} }
Ok(FilterCondition::from_array(txn, &index.0, ands)?) Ok(FilterCondition::from_array(txn, &index, ands)?)
} }
#[cfg(test)] #[cfg(test)]

View File

@ -1,11 +1,9 @@
use std::fs::File;
use crate::index::Index; use crate::index::Index;
use milli::update::UpdateBuilder; use milli::update::UpdateBuilder;
use milli::CompressionType; use milli::CompressionType;
use rayon::ThreadPool; use rayon::ThreadPool;
use crate::index_controller::UpdateMeta; use crate::index_controller::update_actor::RegisterUpdate;
use crate::index_controller::{Failed, Processed, Processing}; use crate::index_controller::{Failed, Processed, Processing};
use crate::option::IndexerOpts; use crate::option::IndexerOpts;
@ -54,31 +52,16 @@ impl UpdateHandler {
pub fn handle_update( pub fn handle_update(
&self, &self,
meta: Processing,
content: Option<File>,
index: Index, index: Index,
meta: Processing,
) -> Result<Processed, Failed> { ) -> Result<Processed, Failed> {
use UpdateMeta::*;
let update_id = meta.id(); let update_id = meta.id();
let update_builder = self.update_builder(update_id); let update_builder = self.update_builder(update_id);
let result = match meta.meta() { let result = match meta.meta() {
DocumentsAddition { RegisterUpdate::DocumentAddition { primary_key, content_uuid, method } => {
method, index.update_documents(*method, *content_uuid, update_builder, primary_key.as_deref())
format, }
primary_key,
} => index.update_documents(
*format,
*method,
content,
update_builder,
primary_key.as_deref(),
),
ClearDocuments => index.clear_documents(update_builder),
DeleteDocuments { ids } => index.delete_documents(ids, update_builder),
Settings(settings) => index.update_settings(&settings.clone().check(), update_builder),
}; };
match result { match result {

View File

@ -1,17 +1,17 @@
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::io;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use flate2::read::GzDecoder;
use log::{debug, info, trace}; use log::{debug, info, trace};
use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder, UpdateFormat}; use milli::documents::DocumentBatchReader;
use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder};
use serde::{Deserialize, Serialize, Serializer}; use serde::{Deserialize, Serialize, Serializer};
use uuid::Uuid;
use crate::index_controller::UpdateResult; use crate::index_controller::UpdateResult;
use super::error::Result;
use super::Index; use super::Index;
use super::error::Result;
fn serialize_with_wildcard<S>( fn serialize_with_wildcard<S>(
field: &Setting<Vec<String>>, field: &Setting<Vec<String>>,
@ -162,31 +162,23 @@ pub struct Facets {
impl Index { impl Index {
pub fn update_documents( pub fn update_documents(
&self, &self,
format: UpdateFormat,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
content: Option<impl io::Read>, content_uuid: Uuid,
update_builder: UpdateBuilder, update_builder: UpdateBuilder,
primary_key: Option<&str>, primary_key: Option<&str>,
) -> Result<UpdateResult> { ) -> Result<UpdateResult> {
let mut txn = self.write_txn()?; let mut txn = self.write_txn()?;
let result = self.update_documents_txn( let result = self.update_documents_txn(&mut txn, method, content_uuid, update_builder, primary_key)?;
&mut txn,
format,
method,
content,
update_builder,
primary_key,
)?;
txn.commit()?; txn.commit()?;
Ok(result) Ok(result)
} }
pub fn update_documents_txn<'a, 'b>( pub fn update_documents_txn<'a, 'b>(
&'a self, &'a self,
txn: &mut heed::RwTxn<'a, 'b>, txn: &mut heed::RwTxn<'a, 'b>,
format: UpdateFormat,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
content: Option<impl io::Read>, content_uuid: Uuid,
update_builder: UpdateBuilder, update_builder: UpdateBuilder,
primary_key: Option<&str>, primary_key: Option<&str>,
) -> Result<UpdateResult> { ) -> Result<UpdateResult> {
@ -199,138 +191,132 @@ impl Index {
builder.execute(|_, _| ())?; builder.execute(|_, _| ())?;
} }
let mut builder = update_builder.index_documents(txn, self);
builder.update_format(format);
builder.index_documents_method(method);
let indexing_callback = let indexing_callback =
|indexing_step, update_id| debug!("update {}: {:?}", update_id, indexing_step); |indexing_step, update_id| debug!("update {}: {:?}", update_id, indexing_step);
let gzipped = false; let content_file = self.update_file_store.get_update(content_uuid).unwrap();
let addition = match content { let reader = DocumentBatchReader::from_reader(content_file).unwrap();
Some(content) if gzipped => {
builder.execute(GzDecoder::new(content), indexing_callback)? let mut builder = update_builder.index_documents(txn, self);
} builder.index_documents_method(method);
Some(content) => builder.execute(content, indexing_callback)?, let addition = builder.execute(reader, indexing_callback)?;
None => builder.execute(std::io::empty(), indexing_callback)?,
};
info!("document addition done: {:?}", addition); info!("document addition done: {:?}", addition);
Ok(UpdateResult::DocumentsAddition(addition)) Ok(UpdateResult::DocumentsAddition(addition))
} }
pub fn clear_documents(&self, update_builder: UpdateBuilder) -> Result<UpdateResult> { //pub fn clear_documents(&self, update_builder: UpdateBuilder) -> Result<UpdateResult> {
// We must use the write transaction of the update here. //// We must use the write transaction of the update here.
let mut wtxn = self.write_txn()?; //let mut wtxn = self.write_txn()?;
let builder = update_builder.clear_documents(&mut wtxn, self); //let builder = update_builder.clear_documents(&mut wtxn, self);
let _count = builder.execute()?; //let _count = builder.execute()?;
wtxn.commit() //wtxn.commit()
.and(Ok(UpdateResult::Other)) //.and(Ok(UpdateResult::Other))
.map_err(Into::into) //.map_err(Into::into)
} //}
pub fn update_settings_txn<'a, 'b>( //pub fn update_settings_txn<'a, 'b>(
&'a self, //&'a self,
txn: &mut heed::RwTxn<'a, 'b>, //txn: &mut heed::RwTxn<'a, 'b>,
settings: &Settings<Checked>, //settings: &Settings<Checked>,
update_builder: UpdateBuilder, //update_builder: UpdateBuilder,
) -> Result<UpdateResult> { //) -> Result<UpdateResult> {
// We must use the write transaction of the update here. //// We must use the write transaction of the update here.
let mut builder = update_builder.settings(txn, self); //let mut builder = update_builder.settings(txn, self);
match settings.searchable_attributes { //match settings.searchable_attributes {
Setting::Set(ref names) => builder.set_searchable_fields(names.clone()), //Setting::Set(ref names) => builder.set_searchable_fields(names.clone()),
Setting::Reset => builder.reset_searchable_fields(), //Setting::Reset => builder.reset_searchable_fields(),
Setting::NotSet => (), //Setting::NotSet => (),
} //}
match settings.displayed_attributes { //match settings.displayed_attributes {
Setting::Set(ref names) => builder.set_displayed_fields(names.clone()), //Setting::Set(ref names) => builder.set_displayed_fields(names.clone()),
Setting::Reset => builder.reset_displayed_fields(), //Setting::Reset => builder.reset_displayed_fields(),
Setting::NotSet => (), //Setting::NotSet => (),
} //}
match settings.filterable_attributes { //match settings.filterable_attributes {
Setting::Set(ref facets) => { //Setting::Set(ref facets) => {
builder.set_filterable_fields(facets.clone().into_iter().collect()) //builder.set_filterable_fields(facets.clone().into_iter().collect())
} //}
Setting::Reset => builder.reset_filterable_fields(), //Setting::Reset => builder.reset_filterable_fields(),
Setting::NotSet => (), //Setting::NotSet => (),
} //}
match settings.sortable_attributes { //match settings.sortable_attributes {
Setting::Set(ref fields) => { //Setting::Set(ref fields) => {
builder.set_sortable_fields(fields.iter().cloned().collect()) //builder.set_sortable_fields(fields.iter().cloned().collect())
} //}
Setting::Reset => builder.reset_sortable_fields(), //Setting::Reset => builder.reset_sortable_fields(),
Setting::NotSet => (), //Setting::NotSet => (),
} //}
match settings.ranking_rules { //match settings.ranking_rules {
Setting::Set(ref criteria) => builder.set_criteria(criteria.clone()), //Setting::Set(ref criteria) => builder.set_criteria(criteria.clone()),
Setting::Reset => builder.reset_criteria(), //Setting::Reset => builder.reset_criteria(),
Setting::NotSet => (), //Setting::NotSet => (),
} //}
match settings.stop_words { //match settings.stop_words {
Setting::Set(ref stop_words) => builder.set_stop_words(stop_words.clone()), //Setting::Set(ref stop_words) => builder.set_stop_words(stop_words.clone()),
Setting::Reset => builder.reset_stop_words(), //Setting::Reset => builder.reset_stop_words(),
Setting::NotSet => (), //Setting::NotSet => (),
} //}
match settings.synonyms { //match settings.synonyms {
Setting::Set(ref synonyms) => { //Setting::Set(ref synonyms) => {
builder.set_synonyms(synonyms.clone().into_iter().collect()) //builder.set_synonyms(synonyms.clone().into_iter().collect())
} //}
Setting::Reset => builder.reset_synonyms(), //Setting::Reset => builder.reset_synonyms(),
Setting::NotSet => (), //Setting::NotSet => (),
} //}
match settings.distinct_attribute { //match settings.distinct_attribute {
Setting::Set(ref attr) => builder.set_distinct_field(attr.clone()), //Setting::Set(ref attr) => builder.set_distinct_field(attr.clone()),
Setting::Reset => builder.reset_distinct_field(), //Setting::Reset => builder.reset_distinct_field(),
Setting::NotSet => (), //Setting::NotSet => (),
} //}
builder.execute(|indexing_step, update_id| { //builder.execute(|indexing_step, update_id| {
debug!("update {}: {:?}", update_id, indexing_step) //debug!("update {}: {:?}", update_id, indexing_step)
})?; //})?;
Ok(UpdateResult::Other) //Ok(UpdateResult::Other)
} //}
pub fn update_settings( //pub fn update_settings(
&self, //&self,
settings: &Settings<Checked>, //settings: &Settings<Checked>,
update_builder: UpdateBuilder, //update_builder: UpdateBuilder,
) -> Result<UpdateResult> { //) -> Result<UpdateResult> {
let mut txn = self.write_txn()?; //let mut txn = self.write_txn()?;
let result = self.update_settings_txn(&mut txn, settings, update_builder)?; //let result = self.update_settings_txn(&mut txn, settings, update_builder)?;
txn.commit()?; //txn.commit()?;
Ok(result) //Ok(result)
} //}
pub fn delete_documents( //pub fn delete_documents(
&self, //&self,
document_ids: &[String], //document_ids: &[String],
update_builder: UpdateBuilder, //update_builder: UpdateBuilder,
) -> Result<UpdateResult> { //) -> Result<UpdateResult> {
let mut txn = self.write_txn()?; //let mut txn = self.write_txn()?;
let mut builder = update_builder.delete_documents(&mut txn, self)?; //let mut builder = update_builder.delete_documents(&mut txn, self)?;
// We ignore unexisting document ids //// We ignore unexisting document ids
document_ids.iter().for_each(|id| { //document_ids.iter().for_each(|id| {
builder.delete_external_id(id); //builder.delete_external_id(id);
}); //});
let deleted = builder.execute()?; //let deleted = builder.execute()?;
txn.commit() //txn.commit()
.and(Ok(UpdateResult::DocumentDeletion { deleted })) //.and(Ok(UpdateResult::DocumentDeletion { deleted }))
.map_err(Into::into) //.map_err(Into::into)
} //}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -1,6 +1,5 @@
use std::path::Path; use std::path::Path;
use actix_web::web::Bytes;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use super::error::Result; use super::error::Result;
@ -32,7 +31,7 @@ impl DumpActorHandleImpl {
pub fn new( pub fn new(
path: impl AsRef<Path>, path: impl AsRef<Path>,
uuid_resolver: crate::index_controller::uuid_resolver::UuidResolverHandleImpl, uuid_resolver: crate::index_controller::uuid_resolver::UuidResolverHandleImpl,
update: crate::index_controller::update_actor::UpdateActorHandleImpl<Bytes>, update: crate::index_controller::update_actor::UpdateActorHandleImpl,
index_db_size: usize, index_db_size: usize,
update_db_size: usize, update_db_size: usize,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {

View File

@ -1,20 +1,16 @@
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::fs::{create_dir_all, File};
use std::io::BufRead;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::path::Path; use std::path::Path;
use std::sync::Arc;
use heed::EnvOpenOptions;
use log::{error, info, warn}; use log::{error, info, warn};
use milli::update::{IndexDocumentsMethod, Setting, UpdateFormat}; use milli::update::Setting;
use serde::{Deserialize, Deserializer, Serialize}; use serde::{Deserialize, Deserializer, Serialize};
use uuid::Uuid; use uuid::Uuid;
use crate::index_controller::{self, uuid_resolver::HeedUuidStore, IndexMetadata}; use crate::index_controller::{self, uuid_resolver::HeedUuidStore, IndexMetadata};
use crate::index_controller::{asc_ranking_rule, desc_ranking_rule}; use crate::index_controller::{asc_ranking_rule, desc_ranking_rule};
use crate::{ use crate::{
index::{update_handler::UpdateHandler, Index, Unchecked}, index::Unchecked,
option::IndexerOpts, option::IndexerOpts,
}; };
@ -86,57 +82,57 @@ struct Settings {
} }
fn load_index( fn load_index(
src: impl AsRef<Path>, _src: impl AsRef<Path>,
dst: impl AsRef<Path>, _dst: impl AsRef<Path>,
uuid: Uuid, _uuid: Uuid,
primary_key: Option<&str>, _primary_key: Option<&str>,
size: usize, _size: usize,
indexer_options: &IndexerOpts, _indexer_options: &IndexerOpts,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let index_path = dst.as_ref().join(&format!("indexes/index-{}", uuid)); todo!("fix dump obkv documents")
//let index_path = dst.as_ref().join(&format!("indexes/index-{}", uuid));
create_dir_all(&index_path)?; //create_dir_all(&index_path)?;
let mut options = EnvOpenOptions::new(); //let mut options = EnvOpenOptions::new();
options.map_size(size); //options.map_size(size);
let index = milli::Index::new(options, index_path)?; //let index = milli::Index::new(options, index_path)?;
let index = Index(Arc::new(index)); //let index = Index(Arc::new(index));
// extract `settings.json` file and import content //// extract `settings.json` file and import content
let settings = import_settings(&src)?; //let settings = import_settings(&src)?;
let settings: index_controller::Settings<Unchecked> = settings.into(); //let settings: index_controller::Settings<Unchecked> = settings.into();
let mut txn = index.write_txn()?; //let mut txn = index.write_txn()?;
let handler = UpdateHandler::new(indexer_options)?; //let handler = UpdateHandler::new(indexer_options)?;
index.update_settings_txn(&mut txn, &settings.check(), handler.update_builder(0))?; //index.update_settings_txn(&mut txn, &settings.check(), handler.update_builder(0))?;
let file = File::open(&src.as_ref().join("documents.jsonl"))?; //let file = File::open(&src.as_ref().join("documents.jsonl"))?;
let mut reader = std::io::BufReader::new(file); //let mut reader = std::io::BufReader::new(file);
reader.fill_buf()?; //reader.fill_buf()?;
if !reader.buffer().is_empty() { //if !reader.buffer().is_empty() {
index.update_documents_txn( //index.update_documents_txn(
&mut txn, //&mut txn,
UpdateFormat::JsonStream, //IndexDocumentsMethod::ReplaceDocuments,
IndexDocumentsMethod::ReplaceDocuments, //Some(reader),
Some(reader), //handler.update_builder(0),
handler.update_builder(0), //primary_key,
primary_key, //)?;
)?; //}
}
txn.commit()?; //txn.commit()?;
// Finaly, we extract the original milli::Index and close it //// Finaly, we extract the original milli::Index and close it
Arc::try_unwrap(index.0) //Arc::try_unwrap(index.0)
.map_err(|_e| "Couldn't close the index properly") //.map_err(|_e| "Couldn't close the index properly")
.unwrap() //.unwrap()
.prepare_for_closing() //.prepare_for_closing()
.wait(); //.wait();
// Updates are ignored in dumps V1. //// Updates are ignored in dumps V1.
Ok(()) //Ok(())
} }
/// we need to **always** be able to convert the old settings to the settings currently being used /// we need to **always** be able to convert the old settings to the settings currently being used
@ -203,15 +199,15 @@ impl From<Settings> for index_controller::Settings<Unchecked> {
} }
} }
/// Extract Settings from `settings.json` file present at provided `dir_path` // /// Extract Settings from `settings.json` file present at provided `dir_path`
fn import_settings(dir_path: impl AsRef<Path>) -> anyhow::Result<Settings> { //fn import_settings(dir_path: impl AsRef<Path>) -> anyhow::Result<Settings> {
let path = dir_path.as_ref().join("settings.json"); //let path = dir_path.as_ref().join("settings.json");
let file = File::open(path)?; //let file = File::open(path)?;
let reader = std::io::BufReader::new(file); //let reader = std::io::BufReader::new(file);
let metadata = serde_json::from_reader(reader)?; //let metadata = serde_json::from_reader(reader)?;
Ok(metadata) //Ok(metadata)
} //}
#[cfg(test)] #[cfg(test)]
mod test { mod test {

View File

@ -1,4 +1,3 @@
use std::fs::File;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
@ -39,6 +38,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
let update_handler = UpdateHandler::new(options)?; let update_handler = UpdateHandler::new(options)?;
let update_handler = Arc::new(update_handler); let update_handler = Arc::new(update_handler);
let receiver = Some(receiver); let receiver = Some(receiver);
Ok(Self { Ok(Self {
receiver, receiver,
update_handler, update_handler,
@ -82,10 +82,9 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Update { Update {
ret, ret,
meta, meta,
data,
uuid, uuid,
} => { } => {
let _ = ret.send(self.handle_update(uuid, meta, data).await); let _ = ret.send(self.handle_update(uuid, meta).await);
} }
Search { ret, query, uuid } => { Search { ret, query, uuid } => {
let _ = ret.send(self.handle_search(uuid, query).await); let _ = ret.send(self.handle_search(uuid, query).await);
@ -165,7 +164,6 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
&self, &self,
uuid: Uuid, uuid: Uuid,
meta: Processing, meta: Processing,
data: Option<File>,
) -> Result<std::result::Result<Processed, Failed>> { ) -> Result<std::result::Result<Processed, Failed>> {
debug!("Processing update {}", meta.id()); debug!("Processing update {}", meta.id());
let update_handler = self.update_handler.clone(); let update_handler = self.update_handler.clone();
@ -174,7 +172,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
None => self.store.create(uuid, None).await?, None => self.store.create(uuid, None).await?,
}; };
Ok(spawn_blocking(move || update_handler.handle_update(meta, data, index)).await?) Ok(spawn_blocking(move || update_handler.handle_update(index, meta)).await?)
} }
async fn handle_settings(&self, uuid: Uuid) -> Result<Settings<Checked>> { async fn handle_settings(&self, uuid: Uuid) -> Result<Settings<Checked>> {
@ -230,7 +228,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
if let Some(index) = index { if let Some(index) = index {
tokio::task::spawn(async move { tokio::task::spawn(async move {
let index = index.0; let index = index.inner;
let store = get_arc_ownership_blocking(index).await; let store = get_arc_ownership_blocking(index).await;
spawn_blocking(move || { spawn_blocking(move || {
store.prepare_for_closing().wait(); store.prepare_for_closing().wait();

View File

@ -38,13 +38,11 @@ impl IndexActorHandle for IndexActorHandleImpl {
&self, &self,
uuid: Uuid, uuid: Uuid,
meta: Processing, meta: Processing,
data: Option<std::fs::File>,
) -> Result<std::result::Result<Processed, Failed>> { ) -> Result<std::result::Result<Processed, Failed>> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Update { let msg = IndexMsg::Update {
ret, ret,
meta, meta,
data,
uuid, uuid,
}; };
let _ = self.sender.send(msg).await; let _ = self.sender.send(msg).await;
@ -156,7 +154,7 @@ impl IndexActorHandleImpl {
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let (sender, receiver) = mpsc::channel(100); let (sender, receiver) = mpsc::channel(100);
let store = MapIndexStore::new(path, index_size); let store = MapIndexStore::new(&path, index_size);
let actor = IndexActor::new(receiver, store, options)?; let actor = IndexActor::new(receiver, store, options)?;
tokio::task::spawn(actor.run()); tokio::task::spawn(actor.run());
Ok(Self { sender }) Ok(Self { sender })

View File

@ -19,7 +19,6 @@ pub enum IndexMsg {
Update { Update {
uuid: Uuid, uuid: Uuid,
meta: Processing, meta: Processing,
data: Option<std::fs::File>,
ret: oneshot::Sender<IndexResult<Result<Processed, Failed>>>, ret: oneshot::Sender<IndexResult<Result<Processed, Failed>>>,
}, },
Search { Search {

View File

@ -1,4 +1,3 @@
use std::fs::File;
use std::path::PathBuf; use std::path::PathBuf;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
@ -59,7 +58,6 @@ pub trait IndexActorHandle {
&self, &self,
uuid: Uuid, uuid: Uuid,
meta: Processing, meta: Processing,
data: Option<File>,
) -> Result<std::result::Result<Processed, Failed>>; ) -> Result<std::result::Result<Processed, Failed>>;
async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult>; async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult>;
async fn settings(&self, uuid: Uuid) -> Result<Settings<Checked>>; async fn settings(&self, uuid: Uuid) -> Result<Settings<Checked>>;

View File

@ -10,6 +10,7 @@ use uuid::Uuid;
use super::error::{IndexActorError, Result}; use super::error::{IndexActorError, Result};
use crate::index::Index; use crate::index::Index;
use crate::index_controller::update_file_store::UpdateFileStore;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>; type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
@ -24,16 +25,19 @@ pub struct MapIndexStore {
index_store: AsyncMap<Uuid, Index>, index_store: AsyncMap<Uuid, Index>,
path: PathBuf, path: PathBuf,
index_size: usize, index_size: usize,
update_file_store: Arc<UpdateFileStore>,
} }
impl MapIndexStore { impl MapIndexStore {
pub fn new(path: impl AsRef<Path>, index_size: usize) -> Self { pub fn new(path: impl AsRef<Path>, index_size: usize) -> Self {
let update_file_store = Arc::new(UpdateFileStore::new(path.as_ref()).unwrap());
let path = path.as_ref().join("indexes/"); let path = path.as_ref().join("indexes/");
let index_store = Arc::new(RwLock::new(HashMap::new())); let index_store = Arc::new(RwLock::new(HashMap::new()));
Self { Self {
index_store, index_store,
path, path,
index_size, index_size,
update_file_store,
} }
} }
} }
@ -54,8 +58,9 @@ impl IndexStore for MapIndexStore {
} }
let index_size = self.index_size; let index_size = self.index_size;
let file_store = self.update_file_store.clone();
let index = spawn_blocking(move || -> Result<Index> { let index = spawn_blocking(move || -> Result<Index> {
let index = Index::open(path, index_size)?; let index = Index::open(path, index_size, file_store)?;
if let Some(primary_key) = primary_key { if let Some(primary_key) = primary_key {
let mut txn = index.write_txn()?; let mut txn = index.write_txn()?;
@ -87,7 +92,8 @@ impl IndexStore for MapIndexStore {
} }
let index_size = self.index_size; let index_size = self.index_size;
let index = spawn_blocking(move || Index::open(path, index_size)).await??; let file_store = self.update_file_store.clone();
let index = spawn_blocking(move || Index::open(path, index_size, file_store)).await??;
self.index_store.write().await.insert(uuid, index.clone()); self.index_store.write().await.insert(uuid, index.clone());
Ok(Some(index)) Ok(Some(index))
} }

View File

@ -1,42 +1,43 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::path::Path; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use actix_web::web::Bytes; use actix_web::error::PayloadError;
use bytes::Bytes;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use futures::stream::StreamExt; use futures::Stream;
use log::error;
use log::info; use log::info;
use milli::FieldDistribution; use milli::FieldDistribution;
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio::time::sleep; use tokio::time::sleep;
use uuid::Uuid; use uuid::Uuid;
use dump_actor::DumpActorHandle; use dump_actor::DumpActorHandle;
pub use dump_actor::{DumpInfo, DumpStatus}; pub use dump_actor::{DumpInfo, DumpStatus};
use index_actor::IndexActorHandle; use index_actor::IndexActorHandle;
use snapshot::{load_snapshot, SnapshotService}; use snapshot::load_snapshot;
use update_actor::UpdateActorHandle; use update_actor::UpdateActorHandle;
pub use updates::*; pub use updates::*;
use uuid_resolver::{error::UuidResolverError, UuidResolverHandle}; use uuid_resolver::{error::UuidResolverError, UuidResolverHandle};
use crate::extractors::payload::Payload;
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::option::Opt; use crate::option::Opt;
use error::Result; use error::Result;
use self::dump_actor::load_dump; use self::dump_actor::load_dump;
use self::error::IndexControllerError;
mod dump_actor; mod dump_actor;
pub mod error; pub mod error;
pub mod index_actor; pub mod index_actor;
mod snapshot; mod snapshot;
mod update_actor; pub mod update_actor;
mod updates; mod updates;
mod uuid_resolver; mod uuid_resolver;
pub mod update_file_store;
pub type Payload = Box<dyn Stream<Item = std::result::Result<Bytes, PayloadError>> + Send + Sync + 'static + Unpin>;
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
@ -72,10 +73,15 @@ pub struct IndexStats {
pub struct IndexController { pub struct IndexController {
uuid_resolver: uuid_resolver::UuidResolverHandleImpl, uuid_resolver: uuid_resolver::UuidResolverHandleImpl,
index_handle: index_actor::IndexActorHandleImpl, index_handle: index_actor::IndexActorHandleImpl,
update_handle: update_actor::UpdateActorHandleImpl<Bytes>, update_handle: update_actor::UpdateActorHandleImpl,
dump_handle: dump_actor::DumpActorHandleImpl, dump_handle: dump_actor::DumpActorHandleImpl,
} }
pub enum DocumentAdditionFormat {
Json,
}
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct Stats { pub struct Stats {
@ -84,6 +90,15 @@ pub struct Stats {
pub indexes: BTreeMap<String, IndexStats>, pub indexes: BTreeMap<String, IndexStats>,
} }
pub enum Update {
DocumentAddition {
payload: Payload,
primary_key: Option<String>,
method: IndexDocumentsMethod,
format: DocumentAdditionFormat,
}
}
impl IndexController { impl IndexController {
pub fn new(path: impl AsRef<Path>, options: &Opt) -> anyhow::Result<Self> { pub fn new(path: impl AsRef<Path>, options: &Opt) -> anyhow::Result<Self> {
let index_size = options.max_index_size.get_bytes() as usize; let index_size = options.max_index_size.get_bytes() as usize;
@ -125,21 +140,21 @@ impl IndexController {
options.max_udb_size.get_bytes() as usize, options.max_udb_size.get_bytes() as usize,
)?; )?;
if options.schedule_snapshot { //if options.schedule_snapshot {
let snapshot_service = SnapshotService::new( //let snapshot_service = SnapshotService::new(
uuid_resolver.clone(), //uuid_resolver.clone(),
update_handle.clone(), //update_handle.clone(),
Duration::from_secs(options.snapshot_interval_sec), //Duration::from_secs(options.snapshot_interval_sec),
options.snapshot_dir.clone(), //options.snapshot_dir.clone(),
options //options
.db_path //.db_path
.file_name() //.file_name()
.map(|n| n.to_owned().into_string().expect("invalid path")) //.map(|n| n.to_owned().into_string().expect("invalid path"))
.unwrap_or_else(|| String::from("data.ms")), //.unwrap_or_else(|| String::from("data.ms")),
); //);
tokio::task::spawn(snapshot_service.run()); //tokio::task::spawn(snapshot_service.run());
} //}
Ok(Self { Ok(Self {
uuid_resolver, uuid_resolver,
@ -149,132 +164,148 @@ impl IndexController {
}) })
} }
pub async fn add_documents( pub async fn register_update(&self, uid: &str, update: Update) -> Result<UpdateStatus> {
&self, match self.uuid_resolver.get(uid.to_string()).await {
uid: String, Ok(uuid) => {
method: milli::update::IndexDocumentsMethod, let update_result = self.update_handle.update(uuid, update).await?;
format: milli::update::UpdateFormat, Ok(update_result)
payload: Payload, },
primary_key: Option<String>,
) -> Result<UpdateStatus> {
let perform_update = |uuid| async move {
let meta = UpdateMeta::DocumentsAddition {
method,
format,
primary_key,
};
let (sender, receiver) = mpsc::channel(10);
// It is necessary to spawn a local task to send the payload to the update handle to
// prevent dead_locking between the update_handle::update that waits for the update to be
// registered and the update_actor that waits for the the payload to be sent to it.
tokio::task::spawn_local(async move {
payload
.for_each(|r| async {
let _ = sender.send(r).await;
})
.await
});
// This must be done *AFTER* spawning the task.
self.update_handle.update(meta, receiver, uuid).await
};
match self.uuid_resolver.get(uid).await {
Ok(uuid) => Ok(perform_update(uuid).await?),
Err(UuidResolverError::UnexistingIndex(name)) => { Err(UuidResolverError::UnexistingIndex(name)) => {
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let status = perform_update(uuid).await?; let update_result = self.update_handle.update(uuid, update).await?;
// ignore if index creation fails now, since it may already have been created // ignore if index creation fails now, since it may already have been created
let _ = self.index_handle.create_index(uuid, None).await; let _ = self.index_handle.create_index(uuid, None).await;
self.uuid_resolver.insert(name, uuid).await?; self.uuid_resolver.insert(name, uuid).await?;
Ok(status) Ok(update_result)
} }
Err(e) => Err(e.into()), Err(e) => Err(e.into()),
} }
} }
pub async fn clear_documents(&self, uid: String) -> Result<UpdateStatus> { //pub async fn add_documents(
let uuid = self.uuid_resolver.get(uid).await?; //&self,
let meta = UpdateMeta::ClearDocuments; //uid: String,
let (_, receiver) = mpsc::channel(1); //method: milli::update::IndexDocumentsMethod,
let status = self.update_handle.update(meta, receiver, uuid).await?; //payload: Payload,
Ok(status) //primary_key: Option<String>,
} //) -> Result<UpdateStatus> {
//let perform_update = |uuid| async move {
//let meta = UpdateMeta::DocumentsAddition {
//method,
//primary_key,
//};
//let (sender, receiver) = mpsc::channel(10);
pub async fn delete_documents( //// It is necessary to spawn a local task to send the payload to the update handle to
&self, //// prevent dead_locking between the update_handle::update that waits for the update to be
uid: String, //// registered and the update_actor that waits for the the payload to be sent to it.
documents: Vec<String>, //tokio::task::spawn_local(async move {
) -> Result<UpdateStatus> { //payload
let uuid = self.uuid_resolver.get(uid).await?; //.for_each(|r| async {
let meta = UpdateMeta::DeleteDocuments { ids: documents }; //let _ = sender.send(r).await;
let (_, receiver) = mpsc::channel(1); //})
let status = self.update_handle.update(meta, receiver, uuid).await?; //.await
Ok(status) //});
}
pub async fn update_settings( //// This must be done *AFTER* spawning the task.
&self, //self.update_handle.update(meta, receiver, uuid).await
uid: String, //};
settings: Settings<Checked>,
create: bool,
) -> Result<UpdateStatus> {
let perform_udpate = |uuid| async move {
let meta = UpdateMeta::Settings(settings.into_unchecked());
// Nothing so send, drop the sender right away, as not to block the update actor.
let (_, receiver) = mpsc::channel(1);
self.update_handle.update(meta, receiver, uuid).await
};
match self.uuid_resolver.get(uid).await { //match self.uuid_resolver.get(uid).await {
Ok(uuid) => Ok(perform_udpate(uuid).await?), //Ok(uuid) => Ok(perform_update(uuid).await?),
Err(UuidResolverError::UnexistingIndex(name)) if create => { //Err(UuidResolverError::UnexistingIndex(name)) => {
let uuid = Uuid::new_v4(); //let uuid = Uuid::new_v4();
let status = perform_udpate(uuid).await?; //let status = perform_update(uuid).await?;
// ignore if index creation fails now, since it may already have been created //// ignore if index creation fails now, since it may already have been created
let _ = self.index_handle.create_index(uuid, None).await; //let _ = self.index_handle.create_index(uuid, None).await;
self.uuid_resolver.insert(name, uuid).await?; //self.uuid_resolver.insert(name, uuid).await?;
Ok(status) //Ok(status)
} //}
Err(e) => Err(e.into()), //Err(e) => Err(e.into()),
} //}
} //}
pub async fn create_index(&self, index_settings: IndexSettings) -> Result<IndexMetadata> { //pub async fn clear_documents(&self, uid: String) -> Result<UpdateStatus> {
let IndexSettings { uid, primary_key } = index_settings; //let uuid = self.uuid_resolver.get(uid).await?;
let uid = uid.ok_or(IndexControllerError::MissingUid)?; //let meta = UpdateMeta::ClearDocuments;
let uuid = Uuid::new_v4(); //let (_, receiver) = mpsc::channel(1);
let meta = self.index_handle.create_index(uuid, primary_key).await?; //let status = self.update_handle.update(meta, receiver, uuid).await?;
self.uuid_resolver.insert(uid.clone(), uuid).await?; //Ok(status)
let meta = IndexMetadata { //}
uuid,
name: uid.clone(),
uid,
meta,
};
Ok(meta) //pub async fn delete_documents(
} //&self,
//uid: String,
//documents: Vec<String>,
//) -> Result<UpdateStatus> {
//let uuid = self.uuid_resolver.get(uid).await?;
//let meta = UpdateMeta::DeleteDocuments { ids: documents };
//let (_, receiver) = mpsc::channel(1);
//let status = self.update_handle.update(meta, receiver, uuid).await?;
//Ok(status)
//}
pub async fn delete_index(&self, uid: String) -> Result<()> { //pub async fn update_settings(
let uuid = self.uuid_resolver.delete(uid).await?; //&self,
//uid: String,
//settings: Settings<Checked>,
//create: bool,
//) -> Result<UpdateStatus> {
//let perform_udpate = |uuid| async move {
//let meta = UpdateMeta::Settings(settings.into_unchecked());
//// Nothing so send, drop the sender right away, as not to block the update actor.
//let (_, receiver) = mpsc::channel(1);
//self.update_handle.update(meta, receiver, uuid).await
//};
// We remove the index from the resolver synchronously, and effectively perform the index //match self.uuid_resolver.get(uid).await {
// deletion as a background task. //Ok(uuid) => Ok(perform_udpate(uuid).await?),
let update_handle = self.update_handle.clone(); //Err(UuidResolverError::UnexistingIndex(name)) if create => {
let index_handle = self.index_handle.clone(); //let uuid = Uuid::new_v4();
tokio::spawn(async move { //let status = perform_udpate(uuid).await?;
if let Err(e) = update_handle.delete(uuid).await { //// ignore if index creation fails now, since it may already have been created
error!("Error while deleting index: {}", e); //let _ = self.index_handle.create_index(uuid, None).await;
} //self.uuid_resolver.insert(name, uuid).await?;
if let Err(e) = index_handle.delete(uuid).await { //Ok(status)
error!("Error while deleting index: {}", e); //}
} //Err(e) => Err(e.into()),
}); //}
//}
Ok(()) //pub async fn create_index(&self, index_settings: IndexSettings) -> Result<IndexMetadata> {
} //let IndexSettings { uid, primary_key } = index_settings;
//let uid = uid.ok_or(IndexControllerError::MissingUid)?;
//let uuid = Uuid::new_v4();
//let meta = self.index_handle.create_index(uuid, primary_key).await?;
//self.uuid_resolver.insert(uid.clone(), uuid).await?;
//let meta = IndexMetadata {
//uuid,
//name: uid.clone(),
//uid,
//meta,
//};
//Ok(meta)
//}
//pub async fn delete_index(&self, uid: String) -> Result<()> {
//let uuid = self.uuid_resolver.delete(uid).await?;
//// We remove the index from the resolver synchronously, and effectively perform the index
//// deletion as a background task.
//let update_handle = self.update_handle.clone();
//let index_handle = self.index_handle.clone();
//tokio::spawn(async move {
//if let Err(e) = update_handle.delete(uuid).await {
//error!("Error while deleting index: {}", e);
//}
//if let Err(e) = index_handle.delete(uuid).await {
//error!("Error while deleting index: {}", e);
//}
//});
//Ok(())
//}
pub async fn update_status(&self, uid: String, id: u64) -> Result<UpdateStatus> { pub async fn update_status(&self, uid: String, id: u64) -> Result<UpdateStatus> {
let uuid = self.uuid_resolver.get(uid).await?; let uuid = self.uuid_resolver.get(uid).await?;
@ -454,3 +485,7 @@ pub fn desc_ranking_rule(text: &str) -> Option<&str> {
.and_then(|(_, tail)| tail.rsplit_once(")")) .and_then(|(_, tail)| tail.rsplit_once(")"))
.map(|(field, _)| field) .map(|(field, _)| field)
} }
fn update_files_path(path: impl AsRef<Path>) -> PathBuf {
path.as_ref().join("updates/updates_files")
}

View File

@ -1,97 +1,90 @@
use std::path::{Path, PathBuf}; use std::path::Path;
use std::time::Duration;
use anyhow::bail; use anyhow::bail;
use log::{error, info, trace};
use tokio::fs;
use tokio::task::spawn_blocking;
use tokio::time::sleep;
use super::update_actor::UpdateActorHandle;
use super::uuid_resolver::UuidResolverHandle;
use crate::helpers::compression; use crate::helpers::compression;
pub struct SnapshotService<U, R> { //pub struct SnapshotService<U, R> {
uuid_resolver_handle: R, //uuid_resolver_handle: R,
update_handle: U, //update_handle: U,
snapshot_period: Duration, //snapshot_period: Duration,
snapshot_path: PathBuf, //snapshot_path: PathBuf,
db_name: String, //db_name: String,
} //}
impl<U, R> SnapshotService<U, R> //impl<U, R> SnapshotService<U, R>
where //where
U: UpdateActorHandle, //U: UpdateActorHandle,
R: UuidResolverHandle, //R: UuidResolverHandle,
{ //{
pub fn new( //pub fn new(
uuid_resolver_handle: R, //uuid_resolver_handle: R,
update_handle: U, //update_handle: U,
snapshot_period: Duration, //snapshot_period: Duration,
snapshot_path: PathBuf, //snapshot_path: PathBuf,
db_name: String, //db_name: String,
) -> Self { //) -> Self {
Self { //Self {
uuid_resolver_handle, //uuid_resolver_handle,
update_handle, //update_handle,
snapshot_period, //snapshot_period,
snapshot_path, //snapshot_path,
db_name, //db_name,
} //}
} //}
pub async fn run(self) { //pub async fn run(self) {
info!( //info!(
"Snapshot scheduled every {}s.", //"Snapshot scheduled every {}s.",
self.snapshot_period.as_secs() //self.snapshot_period.as_secs()
); //);
loop { //loop {
if let Err(e) = self.perform_snapshot().await { //if let Err(e) = self.perform_snapshot().await {
error!("Error while performing snapshot: {}", e); //error!("Error while performing snapshot: {}", e);
} //}
sleep(self.snapshot_period).await; //sleep(self.snapshot_period).await;
} //}
} //}
async fn perform_snapshot(&self) -> anyhow::Result<()> { //async fn perform_snapshot(&self) -> anyhow::Result<()> {
trace!("Performing snapshot."); //trace!("Performing snapshot.");
let snapshot_dir = self.snapshot_path.clone(); //let snapshot_dir = self.snapshot_path.clone();
fs::create_dir_all(&snapshot_dir).await?; //fs::create_dir_all(&snapshot_dir).await?;
let temp_snapshot_dir = //let temp_snapshot_dir =
spawn_blocking(move || tempfile::tempdir_in(snapshot_dir)).await??; //spawn_blocking(move || tempfile::tempdir_in(snapshot_dir)).await??;
let temp_snapshot_path = temp_snapshot_dir.path().to_owned(); //let temp_snapshot_path = temp_snapshot_dir.path().to_owned();
let uuids = self //let uuids = self
.uuid_resolver_handle //.uuid_resolver_handle
.snapshot(temp_snapshot_path.clone()) //.snapshot(temp_snapshot_path.clone())
.await?; //.await?;
if uuids.is_empty() { //if uuids.is_empty() {
return Ok(()); //return Ok(());
} //}
self.update_handle //self.update_handle
.snapshot(uuids, temp_snapshot_path.clone()) //.snapshot(uuids, temp_snapshot_path.clone())
.await?; //.await?;
let snapshot_dir = self.snapshot_path.clone(); //let snapshot_dir = self.snapshot_path.clone();
let snapshot_path = self //let snapshot_path = self
.snapshot_path //.snapshot_path
.join(format!("{}.snapshot", self.db_name)); //.join(format!("{}.snapshot", self.db_name));
let snapshot_path = spawn_blocking(move || -> anyhow::Result<PathBuf> { //let snapshot_path = spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?; //let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?;
let temp_snapshot_file_path = temp_snapshot_file.path().to_owned(); //let temp_snapshot_file_path = temp_snapshot_file.path().to_owned();
compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?; //compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?;
temp_snapshot_file.persist(&snapshot_path)?; //temp_snapshot_file.persist(&snapshot_path)?;
Ok(snapshot_path) //Ok(snapshot_path)
}) //})
.await??; //.await??;
trace!("Created snapshot in {:?}.", snapshot_path); //trace!("Created snapshot in {:?}.", snapshot_path);
Ok(()) //Ok(())
} //}
} //}
pub fn load_snapshot( pub fn load_snapshot(
db_path: impl AsRef<Path>, db_path: impl AsRef<Path>,

View File

@ -1,44 +1,82 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::io::SeekFrom; use std::io;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use actix_web::error::PayloadError;
use async_stream::stream; use async_stream::stream;
use futures::StreamExt; use bytes::Bytes;
use futures::{Stream, StreamExt};
use log::trace; use log::trace;
use serdeval::*; use milli::documents::DocumentBatchBuilder;
use tokio::fs; use serde_json::{Map, Value};
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
use super::error::{Result, UpdateActorError}; use super::error::{Result, UpdateActorError};
use super::{PayloadData, UpdateMsg, UpdateStore, UpdateStoreInfo}; use super::RegisterUpdate;
use super::{UpdateMsg, UpdateStore, UpdateStoreInfo, Update};
use crate::index_controller::index_actor::IndexActorHandle; use crate::index_controller::index_actor::IndexActorHandle;
use crate::index_controller::{UpdateMeta, UpdateStatus}; use crate::index_controller::update_file_store::UpdateFileStore;
use crate::index_controller::{DocumentAdditionFormat, Payload, UpdateStatus};
pub struct UpdateActor<D, I> { pub struct UpdateActor<I> {
path: PathBuf,
store: Arc<UpdateStore>, store: Arc<UpdateStore>,
inbox: Option<mpsc::Receiver<UpdateMsg<D>>>, inbox: Option<mpsc::Receiver<UpdateMsg>>,
update_file_store: UpdateFileStore,
index_handle: I, index_handle: I,
must_exit: Arc<AtomicBool>, must_exit: Arc<AtomicBool>,
} }
impl<D, I> UpdateActor<D, I> struct StreamReader<S> {
stream: S,
current: Option<Bytes>,
}
impl<S> StreamReader<S> {
fn new(stream: S) -> Self {
Self { stream, current: None }
}
}
impl<S: Stream<Item = std::result::Result<Bytes, PayloadError>> + Unpin> io::Read for StreamReader<S> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self.current.take() {
Some(mut bytes) => {
let copied = bytes.split_to(buf.len());
buf.copy_from_slice(&copied);
if !bytes.is_empty() {
self.current.replace(bytes);
}
Ok(copied.len())
}
None => {
match tokio::runtime::Handle::current().block_on(self.stream.next()) {
Some(Ok(bytes)) => {
self.current.replace(bytes);
self.read(buf)
},
Some(Err(e)) => Err(io::Error::new(io::ErrorKind::BrokenPipe, e)),
None => return Ok(0),
}
}
}
}
}
impl<I> UpdateActor<I>
where where
D: AsRef<[u8]> + Sized + 'static, I: IndexActorHandle + Clone + Sync + Send + 'static,
I: IndexActorHandle + Clone + Send + Sync + 'static,
{ {
pub fn new( pub fn new(
update_db_size: usize, update_db_size: usize,
inbox: mpsc::Receiver<UpdateMsg<D>>, inbox: mpsc::Receiver<UpdateMsg>,
path: impl AsRef<Path>, path: impl AsRef<Path>,
index_handle: I, index_handle: I,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let path = path.as_ref().join("updates"); let path = path.as_ref().to_owned();
std::fs::create_dir_all(&path)?; std::fs::create_dir_all(&path)?;
let mut options = heed::EnvOpenOptions::new(); let mut options = heed::EnvOpenOptions::new();
@ -47,14 +85,17 @@ where
let must_exit = Arc::new(AtomicBool::new(false)); let must_exit = Arc::new(AtomicBool::new(false));
let store = UpdateStore::open(options, &path, index_handle.clone(), must_exit.clone())?; let store = UpdateStore::open(options, &path, index_handle.clone(), must_exit.clone())?;
std::fs::create_dir_all(path.join("update_files"))?;
let inbox = Some(inbox); let inbox = Some(inbox);
let update_file_store = UpdateFileStore::new(&path).unwrap();
Ok(Self { Ok(Self {
path,
store, store,
inbox, inbox,
index_handle, index_handle,
must_exit, must_exit,
update_file_store
}) })
} }
@ -89,11 +130,10 @@ where
match msg { match msg {
Update { Update {
uuid, uuid,
meta, update,
data,
ret, ret,
} => { } => {
let _ = ret.send(self.handle_update(uuid, meta, data).await); let _ = ret.send(self.handle_update(uuid, update).await);
} }
ListUpdates { uuid, ret } => { ListUpdates { uuid, ret } => {
let _ = ret.send(self.handle_list_updates(uuid).await); let _ = ret.send(self.handle_list_updates(uuid).await);
@ -120,90 +160,39 @@ where
async fn handle_update( async fn handle_update(
&self, &self,
uuid: Uuid, index_uuid: Uuid,
meta: UpdateMeta, update: Update,
payload: mpsc::Receiver<PayloadData<D>>,
) -> Result<UpdateStatus> { ) -> Result<UpdateStatus> {
let file_path = match meta { let registration = match update {
UpdateMeta::DocumentsAddition { .. } => { Update::DocumentAddition { payload, primary_key, method, format } => {
let update_file_id = uuid::Uuid::new_v4(); let content_uuid = match format {
let path = self DocumentAdditionFormat::Json => self.documents_from_json(payload).await?,
.path };
.join(format!("update_files/update_{}", update_file_id));
let mut file = fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&path)
.await?;
async fn write_to_file<D>( RegisterUpdate::DocumentAddition { primary_key, method, content_uuid }
file: &mut fs::File,
mut payload: mpsc::Receiver<PayloadData<D>>,
) -> Result<usize>
where
D: AsRef<[u8]> + Sized + 'static,
{
let mut file_len = 0;
while let Some(bytes) = payload.recv().await {
let bytes = bytes?;
file_len += bytes.as_ref().len();
file.write_all(bytes.as_ref()).await?;
}
file.flush().await?;
Ok(file_len)
}
let file_len = write_to_file(&mut file, payload).await;
match file_len {
Ok(len) if len > 0 => {
let file = file.into_std().await;
Some((file, update_file_id))
}
Err(e) => {
fs::remove_file(&path).await?;
return Err(e);
}
_ => {
fs::remove_file(&path).await?;
None
}
}
} }
_ => None,
}; };
let update_store = self.store.clone(); let store = self.store.clone();
let status = tokio::task::spawn_blocking(move || store.register_update(index_uuid, registration)).await??;
Ok(status.into())
}
async fn documents_from_json(&self, payload: Payload) -> Result<Uuid> {
let file_store = self.update_file_store.clone();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
use std::io::{BufReader, Seek}; let (uuid, mut file) = file_store.new_update().unwrap();
let mut builder = DocumentBatchBuilder::new(&mut *file).unwrap();
// If the payload is empty, ignore the check. let documents: Vec<Map<String, Value>> = serde_json::from_reader(StreamReader::new(payload))?;
let update_uuid = if let Some((mut file, uuid)) = file_path { builder.add_documents(documents).unwrap();
// set the file back to the beginning builder.finish().unwrap();
file.seek(SeekFrom::Start(0))?;
// Check that the json payload is valid:
let reader = BufReader::new(&mut file);
// Validate that the payload is in the correct format.
let _: Seq<Map<Str, Any>> = serde_json::from_reader(reader)
.map_err(|e| UpdateActorError::InvalidPayload(Box::new(e)))?;
Some(uuid) file.persist();
} else {
None
};
// The payload is valid, we can register it to the update store. Ok(uuid)
let status = update_store }).await?
.register_update(meta, update_uuid, uuid)
.map(UpdateStatus::Enqueued)?;
Ok(status)
})
.await?
} }
async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> { async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
@ -267,4 +256,5 @@ where
Ok(info) Ok(info)
} }
} }

View File

@ -4,45 +4,37 @@ use std::path::{Path, PathBuf};
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use uuid::Uuid; use uuid::Uuid;
use crate::index_controller::{IndexActorHandle, UpdateStatus}; use crate::index_controller::{IndexActorHandle, Update, UpdateStatus};
use super::error::Result; use super::error::Result;
use super::{PayloadData, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStoreInfo}; use super::{UpdateActor, UpdateActorHandle, UpdateMsg, UpdateStoreInfo};
#[derive(Clone)] #[derive(Clone)]
pub struct UpdateActorHandleImpl<D> { pub struct UpdateActorHandleImpl {
sender: mpsc::Sender<UpdateMsg<D>>, sender: mpsc::Sender<UpdateMsg>,
} }
impl<D> UpdateActorHandleImpl<D> impl UpdateActorHandleImpl {
where
D: AsRef<[u8]> + Sized + 'static + Sync + Send,
{
pub fn new<I>( pub fn new<I>(
index_handle: I, index_handle: I,
path: impl AsRef<Path>, path: impl AsRef<Path>,
update_store_size: usize, update_store_size: usize,
) -> anyhow::Result<Self> ) -> anyhow::Result<Self>
where where
I: IndexActorHandle + Clone + Send + Sync + 'static, I: IndexActorHandle + Clone + Sync + Send +'static,
{ {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let (sender, receiver) = mpsc::channel(100); let (sender, receiver) = mpsc::channel(100);
let actor = UpdateActor::new(update_store_size, receiver, path, index_handle)?; let actor = UpdateActor::new(update_store_size, receiver, path, index_handle)?;
tokio::task::spawn(actor.run()); tokio::task::spawn_local(actor.run());
Ok(Self { sender }) Ok(Self { sender })
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl<D> UpdateActorHandle for UpdateActorHandleImpl<D> impl UpdateActorHandle for UpdateActorHandleImpl {
where
D: AsRef<[u8]> + Sized + 'static + Sync + Send,
{
type Data = D;
async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> { async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::ListUpdates { uuid, ret }; let msg = UpdateMsg::ListUpdates { uuid, ret };
@ -86,15 +78,13 @@ where
async fn update( async fn update(
&self, &self,
meta: UpdateMeta,
data: mpsc::Receiver<PayloadData<Self::Data>>,
uuid: Uuid, uuid: Uuid,
update: Update,
) -> Result<UpdateStatus> { ) -> Result<UpdateStatus> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Update { let msg = UpdateMsg::Update {
uuid, uuid,
data, update,
meta,
ret, ret,
}; };
self.sender.send(msg).await?; self.sender.send(msg).await?;

View File

@ -1,17 +1,16 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::path::PathBuf; use std::path::PathBuf;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::oneshot;
use uuid::Uuid; use uuid::Uuid;
use super::error::Result; use super::error::Result;
use super::{PayloadData, UpdateMeta, UpdateStatus, UpdateStoreInfo}; use super::{UpdateStatus, UpdateStoreInfo, Update};
pub enum UpdateMsg<D> { pub enum UpdateMsg {
Update { Update {
uuid: Uuid, uuid: Uuid,
meta: UpdateMeta, update: Update,
data: mpsc::Receiver<PayloadData<D>>,
ret: oneshot::Sender<Result<UpdateStatus>>, ret: oneshot::Sender<Result<UpdateStatus>>,
}, },
ListUpdates { ListUpdates {

View File

@ -1,10 +1,11 @@
use std::{collections::HashSet, path::PathBuf}; use std::{collections::HashSet, path::PathBuf};
use actix_web::error::PayloadError; use milli::update::IndexDocumentsMethod;
use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
use serde::{Serialize, Deserialize};
use crate::index_controller::{UpdateMeta, UpdateStatus}; use crate::index_controller::UpdateStatus;
use super::Update;
use actor::UpdateActor; use actor::UpdateActor;
use error::Result; use error::Result;
@ -19,16 +20,21 @@ mod handle_impl;
mod message; mod message;
pub mod store; pub mod store;
type PayloadData<D> = std::result::Result<D, PayloadError>; #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RegisterUpdate {
DocumentAddition {
primary_key: Option<String>,
method: IndexDocumentsMethod,
content_uuid: Uuid,
}
}
#[cfg(test)] #[cfg(test)]
use mockall::automock; use mockall::automock;
#[async_trait::async_trait] #[async_trait::async_trait]
#[cfg_attr(test, automock(type Data=Vec<u8>;))]
pub trait UpdateActorHandle { pub trait UpdateActorHandle {
type Data: AsRef<[u8]> + Sized + 'static + Sync + Send;
async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>>; async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>>;
async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus>; async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus>;
async fn delete(&self, uuid: Uuid) -> Result<()>; async fn delete(&self, uuid: Uuid) -> Result<()>;
@ -37,8 +43,7 @@ pub trait UpdateActorHandle {
async fn get_info(&self) -> Result<UpdateStoreInfo>; async fn get_info(&self) -> Result<UpdateStoreInfo>;
async fn update( async fn update(
&self, &self,
meta: UpdateMeta,
data: mpsc::Receiver<PayloadData<Self::Data>>,
uuid: Uuid, uuid: Uuid,
update: Update,
) -> Result<UpdateStatus>; ) -> Result<UpdateStatus>;
} }

View File

@ -1,17 +1,17 @@
use std::{ use std::{
collections::HashSet, collections::HashSet,
fs::{create_dir_all, File}, fs::{create_dir_all, File},
io::{BufRead, BufReader, Write}, io::Write,
path::{Path, PathBuf}, path::{Path, PathBuf},
}; };
use heed::{EnvOpenOptions, RoTxn}; use heed::RoTxn;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid; use uuid::Uuid;
use super::{Result, State, UpdateStore}; use super::{Result, State, UpdateStore};
use crate::index_controller::{ use crate::index_controller::{
index_actor::IndexActorHandle, update_actor::store::update_uuid_to_file_path, Enqueued, index_actor::IndexActorHandle,
UpdateStatus, UpdateStatus,
}; };
@ -67,35 +67,36 @@ impl UpdateStore {
fn dump_pending( fn dump_pending(
&self, &self,
txn: &RoTxn, _txn: &RoTxn,
uuids: &HashSet<Uuid>, _uuids: &HashSet<Uuid>,
mut file: &mut File, _file: &mut File,
dst_path: impl AsRef<Path>, _dst_path: impl AsRef<Path>,
) -> Result<()> { ) -> Result<()> {
let pendings = self.pending_queue.iter(txn)?.lazily_decode_data(); todo!()
//let pendings = self.pending_queue.iter(txn)?.lazily_decode_data();
for pending in pendings { //for pending in pendings {
let ((_, uuid, _), data) = pending?; //let ((_, uuid, _), data) = pending?;
if uuids.contains(&uuid) { //if uuids.contains(&uuid) {
let update = data.decode()?; //let update = data.decode()?;
if let Some(ref update_uuid) = update.content { //if let Some(ref update_uuid) = update.content {
let src = super::update_uuid_to_file_path(&self.path, *update_uuid); //let src = super::update_uuid_to_file_path(&self.path, *update_uuid);
let dst = super::update_uuid_to_file_path(&dst_path, *update_uuid); //let dst = super::update_uuid_to_file_path(&dst_path, *update_uuid);
std::fs::copy(src, dst)?; //std::fs::copy(src, dst)?;
} //}
let update_json = UpdateEntry { //let update_json = UpdateEntry {
uuid, //uuid,
update: update.into(), //update: update.into(),
}; //};
serde_json::to_writer(&mut file, &update_json)?; //serde_json::to_writer(&mut file, &update_json)?;
file.write_all(b"\n")?; //file.write_all(b"\n")?;
} //}
} //}
Ok(()) //Ok(())
} }
fn dump_completed( fn dump_completed(
@ -122,52 +123,53 @@ impl UpdateStore {
} }
pub fn load_dump( pub fn load_dump(
src: impl AsRef<Path>, _src: impl AsRef<Path>,
dst: impl AsRef<Path>, _dst: impl AsRef<Path>,
db_size: usize, _db_size: usize,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let dst_update_path = dst.as_ref().join("updates/"); todo!()
create_dir_all(&dst_update_path)?; //let dst_update_path = dst.as_ref().join("updates/");
//create_dir_all(&dst_update_path)?;
let mut options = EnvOpenOptions::new(); //let mut options = EnvOpenOptions::new();
options.map_size(db_size as usize); //options.map_size(db_size as usize);
let (store, _) = UpdateStore::new(options, &dst_update_path)?; //let (store, _) = UpdateStore::new(options, &dst_update_path)?;
let src_update_path = src.as_ref().join("updates"); //let src_update_path = src.as_ref().join("updates");
let update_data = File::open(&src_update_path.join("data.jsonl"))?; //let update_data = File::open(&src_update_path.join("data.jsonl"))?;
let mut update_data = BufReader::new(update_data); //let mut update_data = BufReader::new(update_data);
std::fs::create_dir_all(dst_update_path.join("update_files/"))?; //std::fs::create_dir_all(dst_update_path.join("update_files/"))?;
let mut wtxn = store.env.write_txn()?; //let mut wtxn = store.env.write_txn()?;
let mut line = String::new(); //let mut line = String::new();
loop { //loop {
match update_data.read_line(&mut line) { //match update_data.read_line(&mut line) {
Ok(0) => break, //Ok(0) => break,
Ok(_) => { //Ok(_) => {
let UpdateEntry { uuid, update } = serde_json::from_str(&line)?; //let UpdateEntry { uuid, update } = serde_json::from_str(&line)?;
store.register_raw_updates(&mut wtxn, &update, uuid)?; //store.register_raw_updates(&mut wtxn, &update, uuid)?;
// Copy ascociated update path if it exists //// Copy ascociated update path if it exists
if let UpdateStatus::Enqueued(Enqueued { //if let UpdateStatus::Enqueued(Enqueued {
content: Some(uuid), //content: Some(uuid),
.. //..
}) = update //}) = update
{ //{
let src = update_uuid_to_file_path(&src_update_path, uuid); //let src = update_uuid_to_file_path(&src_update_path, uuid);
let dst = update_uuid_to_file_path(&dst_update_path, uuid); //let dst = update_uuid_to_file_path(&dst_update_path, uuid);
std::fs::copy(src, dst)?; //std::fs::copy(src, dst)?;
} //}
} //}
_ => break, //_ => break,
} //}
line.clear(); //line.clear();
} //}
wtxn.commit()?; //wtxn.commit()?;
Ok(()) //Ok(())
} }
} }

View File

@ -1,7 +1,7 @@
mod codec; mod codec;
pub mod dump; pub mod dump;
use std::fs::{copy, create_dir_all, remove_file, File}; use std::fs::{create_dir_all, remove_file};
use std::path::Path; use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
@ -26,9 +26,10 @@ use uuid::Uuid;
use codec::*; use codec::*;
use super::RegisterUpdate;
use super::error::Result; use super::error::Result;
use super::UpdateMeta;
use crate::helpers::EnvSizer; use crate::helpers::EnvSizer;
use crate::index_controller::update_files_path;
use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle}; use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle};
#[allow(clippy::upper_case_acronyms)] #[allow(clippy::upper_case_acronyms)]
@ -116,7 +117,9 @@ impl UpdateStore {
) -> anyhow::Result<(Self, mpsc::Receiver<()>)> { ) -> anyhow::Result<(Self, mpsc::Receiver<()>)> {
options.max_dbs(5); options.max_dbs(5);
let env = options.open(&path)?; let update_path = path.as_ref().join("updates");
std::fs::create_dir_all(&update_path)?;
let env = options.open(update_path)?;
let pending_queue = env.create_database(Some("pending-queue"))?; let pending_queue = env.create_database(Some("pending-queue"))?;
let next_update_id = env.create_database(Some("next-update-id"))?; let next_update_id = env.create_database(Some("next-update-id"))?;
let updates = env.create_database(Some("updates"))?; let updates = env.create_database(Some("updates"))?;
@ -157,7 +160,7 @@ impl UpdateStore {
// want to close the index. // want to close the index.
let duration = Duration::from_secs(10 * 60); // 10 minutes let duration = Duration::from_secs(10 * 60); // 10 minutes
let update_store_weak = Arc::downgrade(&update_store); let update_store_weak = Arc::downgrade(&update_store);
tokio::task::spawn(async move { tokio::task::spawn_local(async move {
// Block and wait for something to process with a timeout. The timeout // Block and wait for something to process with a timeout. The timeout
// function returns a Result and we must just unlock the loop on Result. // function returns a Result and we must just unlock the loop on Result.
'outer: while timeout(duration, notification_receiver.recv()) 'outer: while timeout(duration, notification_receiver.recv())
@ -233,14 +236,12 @@ impl UpdateStore {
/// into the pending-meta store. Returns the new unique update id. /// into the pending-meta store. Returns the new unique update id.
pub fn register_update( pub fn register_update(
&self, &self,
meta: UpdateMeta,
content: Option<Uuid>,
index_uuid: Uuid, index_uuid: Uuid,
update: RegisterUpdate,
) -> heed::Result<Enqueued> { ) -> heed::Result<Enqueued> {
let mut txn = self.env.write_txn()?; let mut txn = self.env.write_txn()?;
let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?; let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?;
let meta = Enqueued::new(meta, update_id, content); let meta = Enqueued::new(update, update_id);
self.pending_queue self.pending_queue
.put(&mut txn, &(global_id, index_uuid, update_id), &meta)?; .put(&mut txn, &(global_id, index_uuid, update_id), &meta)?;
@ -254,30 +255,30 @@ impl UpdateStore {
Ok(meta) Ok(meta)
} }
/// Push already processed update in the UpdateStore without triggering the notification // /// Push already processed update in the UpdateStore without triggering the notification
/// process. This is useful for the dumps. // /// process. This is useful for the dumps.
pub fn register_raw_updates( //pub fn register_raw_updates(
&self, //&self,
wtxn: &mut heed::RwTxn, //wtxn: &mut heed::RwTxn,
update: &UpdateStatus, //update: &UpdateStatus,
index_uuid: Uuid, //index_uuid: Uuid,
) -> heed::Result<()> { //) -> heed::Result<()> {
match update { //match update {
UpdateStatus::Enqueued(enqueued) => { //UpdateStatus::Enqueued(enqueued) => {
let (global_id, _update_id) = self.next_update_id(wtxn, index_uuid)?; //let (global_id, _update_id) = self.next_update_id(wtxn, index_uuid)?;
self.pending_queue.remap_key_type::<PendingKeyCodec>().put( //self.pending_queue.remap_key_type::<PendingKeyCodec>().put(
wtxn, //wtxn,
&(global_id, index_uuid, enqueued.id()), //&(global_id, index_uuid, enqueued.id()),
enqueued, //enqueued,
)?; //)?;
} //}
_ => { //_ => {
let _update_id = self.next_update_id_raw(wtxn, index_uuid)?; //let _update_id = self.next_update_id_raw(wtxn, index_uuid)?;
self.updates.put(wtxn, &(index_uuid, update.id()), update)?; //self.updates.put(wtxn, &(index_uuid, update.id()), update)?;
} //}
} //}
Ok(()) //Ok(())
} //}
/// Executes the user provided function on the next pending update (the one with the lowest id). /// Executes the user provided function on the next pending update (the one with the lowest id).
/// This is asynchronous as it let the user process the update with a read-only txn and /// This is asynchronous as it let the user process the update with a read-only txn and
@ -291,8 +292,7 @@ impl UpdateStore {
// If there is a pending update we process and only keep // If there is a pending update we process and only keep
// a reader while processing it, not a writer. // a reader while processing it, not a writer.
match first_meta { match first_meta {
Some(((global_id, index_uuid, _), mut pending)) => { Some(((global_id, index_uuid, _), pending)) => {
let content = pending.content.take();
let processing = pending.processing(); let processing = pending.processing();
// Acquire the state lock and set the current state to processing. // Acquire the state lock and set the current state to processing.
// txn must *always* be acquired after state lock, or it will dead lock. // txn must *always* be acquired after state lock, or it will dead lock.
@ -300,7 +300,7 @@ impl UpdateStore {
state.swap(State::Processing(index_uuid, processing.clone())); state.swap(State::Processing(index_uuid, processing.clone()));
let result = let result =
self.perform_update(content, processing, index_handle, index_uuid, global_id); self.perform_update(processing, index_handle, index_uuid, global_id);
state.swap(State::Idle); state.swap(State::Idle);
@ -312,27 +312,16 @@ impl UpdateStore {
fn perform_update( fn perform_update(
&self, &self,
content: Option<Uuid>,
processing: Processing, processing: Processing,
index_handle: impl IndexActorHandle, index_handle: impl IndexActorHandle,
index_uuid: Uuid, index_uuid: Uuid,
global_id: u64, global_id: u64,
) -> Result<Option<()>> { ) -> Result<Option<()>> {
let content_path = content.map(|uuid| update_uuid_to_file_path(&self.path, uuid));
let update_id = processing.id();
let file = match content_path {
Some(ref path) => {
let file = File::open(path)?;
Some(file)
}
None => None,
};
// Process the pending update using the provided user function. // Process the pending update using the provided user function.
let handle = Handle::current(); let handle = Handle::current();
let update_id = processing.id();
let result = let result =
match handle.block_on(index_handle.update(index_uuid, processing.clone(), file)) { match handle.block_on(index_handle.update(index_uuid, processing.clone())) {
Ok(result) => result, Ok(result) => result,
Err(e) => Err(processing.fail(e.into())), Err(e) => Err(processing.fail(e.into())),
}; };
@ -354,10 +343,6 @@ impl UpdateStore {
wtxn.commit()?; wtxn.commit()?;
if let Some(ref path) = content_path {
remove_file(&path)?;
}
Ok(Some(())) Ok(Some(()))
} }
@ -435,16 +420,16 @@ impl UpdateStore {
pub fn delete_all(&self, index_uuid: Uuid) -> Result<()> { pub fn delete_all(&self, index_uuid: Uuid) -> Result<()> {
let mut txn = self.env.write_txn()?; let mut txn = self.env.write_txn()?;
// Contains all the content file paths that we need to be removed if the deletion was successful. // Contains all the content file paths that we need to be removed if the deletion was successful.
let mut uuids_to_remove = Vec::new(); let uuids_to_remove = Vec::new();
let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data(); let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data();
while let Some(Ok(((_, uuid, _), pending))) = pendings.next() { while let Some(Ok(((_, uuid, _), pending))) = pendings.next() {
if uuid == index_uuid { if uuid == index_uuid {
let mut pending = pending.decode()?; let mut _pending = pending.decode()?;
if let Some(update_uuid) = pending.content.take() { //if let Some(update_uuid) = pending.content.take() {
uuids_to_remove.push(update_uuid); //uuids_to_remove.push(update_uuid);
} //}
// Invariant check: we can only delete the current entry when we don't hold // Invariant check: we can only delete the current entry when we don't hold
// references to it anymore. This must be done after we have retrieved its content. // references to it anymore. This must be done after we have retrieved its content.
@ -486,7 +471,7 @@ impl UpdateStore {
// them. // them.
uuids_to_remove uuids_to_remove
.iter() .iter()
.map(|uuid| update_uuid_to_file_path(&self.path, *uuid)) .map(|uuid: &Uuid| update_files_path(&self.path).join(uuid.to_string()))
.for_each(|path| { .for_each(|path| {
let _ = remove_file(path); let _ = remove_file(path);
}); });
@ -521,17 +506,17 @@ impl UpdateStore {
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
for entry in pendings { for entry in pendings {
let ((_, uuid, _), pending) = entry?; let ((_, _uuid, _), _pending) = entry?;
if uuids.contains(&uuid) { //if uuids.contains(&uuid) {
if let Enqueued { //if let Enqueued {
content: Some(uuid), //content: Some(uuid),
.. //..
} = pending.decode()? //} = pending.decode()?
{ //{
let path = update_uuid_to_file_path(&self.path, uuid); //let path = update_uuid_to_file_path(&self.path, uuid);
copy(path, &update_files_path)?; //copy(path, &update_files_path)?;
} //}
} //}
} }
let path = &path.as_ref().to_path_buf(); let path = &path.as_ref().to_path_buf();
@ -553,18 +538,18 @@ impl UpdateStore {
} }
pub fn get_info(&self) -> Result<UpdateStoreInfo> { pub fn get_info(&self) -> Result<UpdateStoreInfo> {
let mut size = self.env.size(); let size = self.env.size();
let txn = self.env.read_txn()?; let txn = self.env.read_txn()?;
for entry in self.pending_queue.iter(&txn)? { for entry in self.pending_queue.iter(&txn)? {
let (_, pending) = entry?; let (_, _pending) = entry?;
if let Enqueued { //if let Enqueued {
content: Some(uuid), //content: Some(uuid),
.. //..
} = pending //} = pending
{ //{
let path = update_uuid_to_file_path(&self.path, uuid); //let path = update_uuid_to_file_path(&self.path, uuid);
size += File::open(path)?.metadata()?.len(); //size += File::open(path)?.metadata()?.len();
} //}
} }
let processing = match *self.state.read() { let processing = match *self.state.read() {
State::Processing(uuid, _) => Some(uuid), State::Processing(uuid, _) => Some(uuid),
@ -575,12 +560,6 @@ impl UpdateStore {
} }
} }
fn update_uuid_to_file_path(root: impl AsRef<Path>, uuid: Uuid) -> PathBuf {
root.as_ref()
.join(UPDATE_DIR)
.join(format!("update_{}", uuid))
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;

View File

@ -0,0 +1,63 @@
use std::fs::File;
use std::path::{Path, PathBuf};
use std::ops::{Deref, DerefMut};
use tempfile::NamedTempFile;
use uuid::Uuid;
use super::error::Result;
pub struct UpdateFile {
path: PathBuf,
file: NamedTempFile,
}
impl UpdateFile {
pub fn persist(self) {
println!("persisting in {}", self.path.display());
self.file.persist(&self.path).unwrap();
}
}
impl Deref for UpdateFile {
type Target = NamedTempFile;
fn deref(&self) -> &Self::Target {
&self.file
}
}
impl DerefMut for UpdateFile {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.file
}
}
#[derive(Clone, Debug)]
pub struct UpdateFileStore {
path: PathBuf,
}
impl UpdateFileStore {
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().join("updates/updates_files");
std::fs::create_dir_all(&path).unwrap();
Ok(Self { path })
}
pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> {
let file = NamedTempFile::new().unwrap();
let uuid = Uuid::new_v4();
let path = self.path.join(uuid.to_string());
let update_file = UpdateFile { file, path };
Ok((uuid, update_file))
}
pub fn get_update(&self, uuid: Uuid) -> Result<File> {
let path = self.path.join(uuid.to_string());
println!("reading in {}", path.display());
let file = File::open(path).unwrap();
Ok(file)
}
}

View File

@ -1,13 +1,14 @@
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateFormat}; use milli::update::{DocumentAdditionResult, IndexDocumentsMethod};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::{ use crate::{
error::ResponseError, error::ResponseError,
index::{Settings, Unchecked}, index::{Settings, Unchecked},
}; };
use super::update_actor::RegisterUpdate;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateResult { pub enum UpdateResult {
DocumentsAddition(DocumentAdditionResult), DocumentsAddition(DocumentAdditionResult),
@ -21,7 +22,6 @@ pub enum UpdateResult {
pub enum UpdateMeta { pub enum UpdateMeta {
DocumentsAddition { DocumentsAddition {
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
format: UpdateFormat,
primary_key: Option<String>, primary_key: Option<String>,
}, },
ClearDocuments, ClearDocuments,
@ -35,18 +35,16 @@ pub enum UpdateMeta {
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct Enqueued { pub struct Enqueued {
pub update_id: u64, pub update_id: u64,
pub meta: UpdateMeta, pub meta: RegisterUpdate,
pub enqueued_at: DateTime<Utc>, pub enqueued_at: DateTime<Utc>,
pub content: Option<Uuid>,
} }
impl Enqueued { impl Enqueued {
pub fn new(meta: UpdateMeta, update_id: u64, content: Option<Uuid>) -> Self { pub fn new(meta: RegisterUpdate, update_id: u64) -> Self {
Self { Self {
enqueued_at: Utc::now(), enqueued_at: Utc::now(),
meta, meta,
update_id, update_id,
content,
} }
} }
@ -64,7 +62,7 @@ impl Enqueued {
} }
} }
pub fn meta(&self) -> &UpdateMeta { pub fn meta(&self) -> &RegisterUpdate {
&self.meta &self.meta
} }
@ -87,7 +85,7 @@ impl Processed {
self.from.id() self.from.id()
} }
pub fn meta(&self) -> &UpdateMeta { pub fn meta(&self) -> &RegisterUpdate {
self.from.meta() self.from.meta()
} }
} }
@ -105,7 +103,7 @@ impl Processing {
self.from.id() self.from.id()
} }
pub fn meta(&self) -> &UpdateMeta { pub fn meta(&self) -> &RegisterUpdate {
self.from.meta() self.from.meta()
} }
@ -139,7 +137,7 @@ impl Aborted {
self.from.id() self.from.id()
} }
pub fn meta(&self) -> &UpdateMeta { pub fn meta(&self) -> &RegisterUpdate {
self.from.meta() self.from.meta()
} }
} }
@ -158,7 +156,7 @@ impl Failed {
self.from.id() self.from.id()
} }
pub fn meta(&self) -> &UpdateMeta { pub fn meta(&self) -> &RegisterUpdate {
self.from.meta() self.from.meta()
} }
} }
@ -184,7 +182,7 @@ impl UpdateStatus {
} }
} }
pub fn meta(&self) -> &UpdateMeta { pub fn meta(&self) -> &RegisterUpdate {
match self { match self {
UpdateStatus::Processing(u) => u.meta(), UpdateStatus::Processing(u) => u.meta(),
UpdateStatus::Enqueued(u) => u.meta(), UpdateStatus::Enqueued(u) => u.meta(),

View File

@ -1,7 +1,6 @@
use std::env; use std::env;
use actix_web::HttpServer; use actix_web::HttpServer;
use main_error::MainError;
use meilisearch_http::{create_app, Data, Opt}; use meilisearch_http::{create_app, Data, Opt};
use structopt::StructOpt; use structopt::StructOpt;
@ -12,10 +11,7 @@ use meilisearch_http::analytics;
#[global_allocator] #[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
#[actix_web::main] fn setup(opt: &Opt) -> anyhow::Result<()> {
async fn main() -> Result<(), MainError> {
let opt = Opt::from_args();
let mut log_builder = env_logger::Builder::new(); let mut log_builder = env_logger::Builder::new();
log_builder.parse_filters(&opt.log_level); log_builder.parse_filters(&opt.log_level);
if opt.log_level == "info" { if opt.log_level == "info" {
@ -25,13 +21,34 @@ async fn main() -> Result<(), MainError> {
log_builder.init(); log_builder.init();
// Set the tempfile directory in the current db path, to avoid cross device references. Also
// remove the previous outstanding files found there
//
// TODO: if two processes open the same db, one might delete the other tmpdir. Need to make
// sure that no one is using it before deleting it.
let temp_path = opt.db_path.join("tmp");
// Ignore error if tempdir doesn't exist
let _ = std::fs::remove_dir_all(&temp_path);
std::fs::create_dir_all(&temp_path)?;
if cfg!(windows) {
std::env::set_var("TMP", temp_path);
} else {
std::env::set_var("TMPDIR", temp_path);
}
Ok(())
}
#[actix_web::main]
async fn main() -> anyhow::Result<()> {
let opt = Opt::from_args();
setup(&opt)?;
match opt.env.as_ref() { match opt.env.as_ref() {
"production" => { "production" => {
if opt.master_key.is_none() { if opt.master_key.is_none() {
return Err( anyhow::bail!("In production mode, the environment variable MEILI_MASTER_KEY is mandatory")
"In production mode, the environment variable MEILI_MASTER_KEY is mandatory"
.into(),
);
} }
} }
"development" => (), "development" => (),
@ -54,7 +71,7 @@ async fn main() -> Result<(), MainError> {
Ok(()) Ok(())
} }
async fn run_http(data: Data, opt: Opt) -> Result<(), Box<dyn std::error::Error>> { async fn run_http(data: Data, opt: Opt) -> anyhow::Result<()> {
let _enable_dashboard = &opt.env == "development"; let _enable_dashboard = &opt.env == "development";
let http_server = HttpServer::new(move || create_app!(data, _enable_dashboard)) let http_server = HttpServer::new(move || create_app!(data, _enable_dashboard))
// Disable signals allows the server to terminate immediately when a user enter CTRL-C // Disable signals allows the server to terminate immediately when a user enter CTRL-C

View File

@ -5,7 +5,7 @@ use std::ops::Deref;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::{error, fs}; use std::fs;
use byte_unit::Byte; use byte_unit::Byte;
use milli::CompressionType; use milli::CompressionType;
@ -184,7 +184,7 @@ pub struct Opt {
} }
impl Opt { impl Opt {
pub fn get_ssl_config(&self) -> Result<Option<rustls::ServerConfig>, Box<dyn error::Error>> { pub fn get_ssl_config(&self) -> anyhow::Result<Option<rustls::ServerConfig>> {
if let (Some(cert_path), Some(key_path)) = (&self.ssl_cert_path, &self.ssl_key_path) { if let (Some(cert_path), Some(key_path)) = (&self.ssl_cert_path, &self.ssl_key_path) {
let client_auth = match &self.ssl_auth_path { let client_auth = match &self.ssl_auth_path {
Some(auth_path) => { Some(auth_path) => {
@ -210,7 +210,7 @@ impl Opt {
let ocsp = load_ocsp(&self.ssl_ocsp_path)?; let ocsp = load_ocsp(&self.ssl_ocsp_path)?;
config config
.set_single_cert_with_ocsp_and_sct(certs, privkey, ocsp, vec![]) .set_single_cert_with_ocsp_and_sct(certs, privkey, ocsp, vec![])
.map_err(|_| "bad certificates/private key")?; .map_err(|_| anyhow::anyhow!("bad certificates/private key"))?;
if self.ssl_resumption { if self.ssl_resumption {
config.set_persistence(rustls::ServerSessionMemoryCache::new(256)); config.set_persistence(rustls::ServerSessionMemoryCache::new(256));
@ -284,25 +284,25 @@ fn total_memory_bytes() -> Option<u64> {
} }
} }
fn load_certs(filename: PathBuf) -> Result<Vec<rustls::Certificate>, Box<dyn error::Error>> { fn load_certs(filename: PathBuf) -> anyhow::Result<Vec<rustls::Certificate>> {
let certfile = fs::File::open(filename).map_err(|_| "cannot open certificate file")?; let certfile = fs::File::open(filename).map_err(|_| anyhow::anyhow!("cannot open certificate file"))?;
let mut reader = BufReader::new(certfile); let mut reader = BufReader::new(certfile);
Ok(certs(&mut reader).map_err(|_| "cannot read certificate file")?) Ok(certs(&mut reader).map_err(|_| anyhow::anyhow!("cannot read certificate file"))?)
} }
fn load_private_key(filename: PathBuf) -> Result<rustls::PrivateKey, Box<dyn error::Error>> { fn load_private_key(filename: PathBuf) -> anyhow::Result<rustls::PrivateKey> {
let rsa_keys = { let rsa_keys = {
let keyfile = let keyfile =
fs::File::open(filename.clone()).map_err(|_| "cannot open private key file")?; fs::File::open(filename.clone()).map_err(|_| anyhow::anyhow!("cannot open private key file"))?;
let mut reader = BufReader::new(keyfile); let mut reader = BufReader::new(keyfile);
rsa_private_keys(&mut reader).map_err(|_| "file contains invalid rsa private key")? rsa_private_keys(&mut reader).map_err(|_| anyhow::anyhow!("file contains invalid rsa private key"))?
}; };
let pkcs8_keys = { let pkcs8_keys = {
let keyfile = fs::File::open(filename).map_err(|_| "cannot open private key file")?; let keyfile = fs::File::open(filename).map_err(|_| anyhow::anyhow!("cannot open private key file"))?;
let mut reader = BufReader::new(keyfile); let mut reader = BufReader::new(keyfile);
pkcs8_private_keys(&mut reader) pkcs8_private_keys(&mut reader)
.map_err(|_| "file contains invalid pkcs8 private key (encrypted keys not supported)")? .map_err(|_| anyhow::anyhow!("file contains invalid pkcs8 private key (encrypted keys not supported)"))?
}; };
// prefer to load pkcs8 keys // prefer to load pkcs8 keys
@ -314,14 +314,14 @@ fn load_private_key(filename: PathBuf) -> Result<rustls::PrivateKey, Box<dyn err
} }
} }
fn load_ocsp(filename: &Option<PathBuf>) -> Result<Vec<u8>, Box<dyn error::Error>> { fn load_ocsp(filename: &Option<PathBuf>) -> anyhow::Result<Vec<u8>> {
let mut ret = Vec::new(); let mut ret = Vec::new();
if let Some(ref name) = filename { if let Some(ref name) = filename {
fs::File::open(name) fs::File::open(name)
.map_err(|_| "cannot open ocsp file")? .map_err(|_| anyhow::anyhow!("cannot open ocsp file"))?
.read_to_end(&mut ret) .read_to_end(&mut ret)
.map_err(|_| "cannot read oscp file")?; .map_err(|_| anyhow::anyhow!("cannot read oscp file"))?;
} }
Ok(ret) Ok(ret)

View File

@ -1,12 +1,17 @@
use actix_web::error::PayloadError;
use actix_web::{web, HttpResponse}; use actix_web::{web, HttpResponse};
use actix_web::web::Bytes;
use futures::{Stream, StreamExt};
use log::debug; use log::debug;
use milli::update::{IndexDocumentsMethod, UpdateFormat}; use milli::update::IndexDocumentsMethod;
use serde::Deserialize; use serde::Deserialize;
use serde_json::Value; //use serde_json::Value;
use tokio::sync::mpsc;
use crate::error::ResponseError; use crate::error::ResponseError;
use crate::extractors::authentication::{policies::*, GuardedData}; use crate::extractors::authentication::{policies::*, GuardedData};
use crate::extractors::payload::Payload; use crate::extractors::payload::Payload;
use crate::index_controller::{DocumentAdditionFormat, Update};
use crate::routes::IndexParam; use crate::routes::IndexParam;
use crate::Data; use crate::Data;
@ -32,6 +37,17 @@ macro_rules! guard_content_type {
guard_content_type!(guard_json, "application/json"); guard_content_type!(guard_json, "application/json");
*/ */
/// This is required because Payload is not Sync nor Send
fn payload_to_stream(mut payload: Payload) -> impl Stream<Item=Result<Bytes, PayloadError>> {
let (snd, recv) = mpsc::channel(1);
tokio::task::spawn_local(async move {
while let Some(data) = payload.next().await {
let _ = snd.send(data).await;
}
});
tokio_stream::wrappers::ReceiverStream::new(recv)
}
fn guard_json(head: &actix_web::dev::RequestHead) -> bool { fn guard_json(head: &actix_web::dev::RequestHead) -> bool {
if let Some(_content_type) = head.headers.get("Content-Type") { if let Some(_content_type) = head.headers.get("Content-Type") {
// CURRENTLY AND FOR THIS RELEASE ONLY WE DECIDED TO INTERPRET ALL CONTENT-TYPES AS JSON // CURRENTLY AND FOR THIS RELEASE ONLY WE DECIDED TO INTERPRET ALL CONTENT-TYPES AS JSON
@ -60,14 +76,14 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.route(web::get().to(get_all_documents)) .route(web::get().to(get_all_documents))
.route(web::post().guard(guard_json).to(add_documents)) .route(web::post().guard(guard_json).to(add_documents))
.route(web::put().guard(guard_json).to(update_documents)) .route(web::put().guard(guard_json).to(update_documents))
.route(web::delete().to(clear_all_documents)), //.route(web::delete().to(clear_all_documents)),
) )
// this route needs to be before the /documents/{document_id} to match properly // this route needs to be before the /documents/{document_id} to match properly
.service(web::resource("/delete-batch").route(web::post().to(delete_documents))) //.service(web::resource("/delete-batch").route(web::post().to(delete_documents)))
.service( .service(
web::resource("/{document_id}") web::resource("/{document_id}")
.route(web::get().to(get_document)) .route(web::get().to(get_document))
.route(web::delete().to(delete_document)), //.route(web::delete().to(delete_document)),
); );
} }
@ -84,16 +100,16 @@ pub async fn get_document(
Ok(HttpResponse::Ok().json(document)) Ok(HttpResponse::Ok().json(document))
} }
pub async fn delete_document( //pub async fn delete_document(
data: GuardedData<Private, Data>, //data: GuardedData<Private, Data>,
path: web::Path<DocumentParam>, //path: web::Path<DocumentParam>,
) -> Result<HttpResponse, ResponseError> { //) -> Result<HttpResponse, ResponseError> {
let update_status = data //let update_status = data
.delete_documents(path.index_uid.clone(), vec![path.document_id.clone()]) //.delete_documents(path.index_uid.clone(), vec![path.document_id.clone()])
.await?; //.await?;
debug!("returns: {:?}", update_status); //debug!("returns: {:?}", update_status);
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) //Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })))
} //}
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase", deny_unknown_fields)] #[serde(rename_all = "camelCase", deny_unknown_fields)]
@ -147,14 +163,14 @@ pub async fn add_documents(
body: Payload, body: Payload,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", params); debug!("called with params: {:?}", params);
let update = Update::DocumentAddition {
payload: Box::new(payload_to_stream(body)),
primary_key: params.primary_key.clone(),
method: IndexDocumentsMethod::ReplaceDocuments,
format: DocumentAdditionFormat::Json,
};
let update_status = data let update_status = data
.add_documents( .register_update(path.index_uid.as_str(), update)
path.into_inner().index_uid,
IndexDocumentsMethod::ReplaceDocuments,
UpdateFormat::Json,
body,
params.primary_key.clone(),
)
.await?; .await?;
debug!("returns: {:?}", update_status); debug!("returns: {:?}", update_status);
@ -170,45 +186,45 @@ pub async fn update_documents(
body: Payload, body: Payload,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", params); debug!("called with params: {:?}", params);
let update = data let update = Update::DocumentAddition {
.add_documents( payload: Box::new(payload_to_stream(body)),
path.into_inner().index_uid, primary_key: params.primary_key.clone(),
IndexDocumentsMethod::UpdateDocuments, method: IndexDocumentsMethod::UpdateDocuments,
UpdateFormat::Json, format: DocumentAdditionFormat::Json,
body, };
params.primary_key.clone(), let update_status = data
) .register_update(path.index_uid.as_str(), update)
.await?; .await?;
debug!("returns: {:?}", update);
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update.id() })))
}
pub async fn delete_documents(
data: GuardedData<Private, Data>,
path: web::Path<IndexParam>,
body: web::Json<Vec<Value>>,
) -> Result<HttpResponse, ResponseError> {
debug!("called with params: {:?}", body);
let ids = body
.iter()
.map(|v| {
v.as_str()
.map(String::from)
.unwrap_or_else(|| v.to_string())
})
.collect();
let update_status = data.delete_documents(path.index_uid.clone(), ids).await?;
debug!("returns: {:?}", update_status); debug!("returns: {:?}", update_status);
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })))
} }
pub async fn clear_all_documents( //pub async fn delete_documents(
data: GuardedData<Private, Data>, //data: GuardedData<Private, Data>,
path: web::Path<IndexParam>, //path: web::Path<IndexParam>,
) -> Result<HttpResponse, ResponseError> { //body: web::Json<Vec<Value>>,
let update_status = data.clear_documents(path.index_uid.clone()).await?; //) -> Result<HttpResponse, ResponseError> {
debug!("returns: {:?}", update_status); //debug!("called with params: {:?}", body);
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) //let ids = body
} //.iter()
//.map(|v| {
//v.as_str()
//.map(String::from)
//.unwrap_or_else(|| v.to_string())
//})
//.collect();
//let update_status = data.delete_documents(path.index_uid.clone(), ids).await?;
//debug!("returns: {:?}", update_status);
//Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })))
//}
//pub async fn clear_all_documents(
//data: GuardedData<Private, Data>,
//path: web::Path<IndexParam>,
//) -> Result<HttpResponse, ResponseError> {
//let update_status = data.clear_documents(path.index_uid.clone()).await?;
//debug!("returns: {:?}", update_status);
//Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })))
//}

View File

@ -17,7 +17,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service( cfg.service(
web::resource("") web::resource("")
.route(web::get().to(list_indexes)) .route(web::get().to(list_indexes))
.route(web::post().to(create_index)), //.route(web::post().to(create_index)),
) )
.service( .service(
web::scope("/{index_uid}") web::scope("/{index_uid}")
@ -25,13 +25,13 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
web::resource("") web::resource("")
.route(web::get().to(get_index)) .route(web::get().to(get_index))
.route(web::put().to(update_index)) .route(web::put().to(update_index))
.route(web::delete().to(delete_index)), //.route(web::delete().to(delete_index)),
) )
.service(web::resource("/stats").route(web::get().to(get_index_stats))) .service(web::resource("/stats").route(web::get().to(get_index_stats)))
.service(web::scope("/documents").configure(documents::configure)) .service(web::scope("/documents").configure(documents::configure))
.service(web::scope("/search").configure(search::configure)) .service(web::scope("/search").configure(search::configure))
.service(web::scope("/updates").configure(updates::configure)) .service(web::scope("/updates").configure(updates::configure))
.service(web::scope("/settings").configure(settings::configure)), //.service(web::scope("/settings").configure(settings::configure)),
); );
} }
@ -48,14 +48,14 @@ pub struct IndexCreateRequest {
primary_key: Option<String>, primary_key: Option<String>,
} }
pub async fn create_index( //pub async fn create_index(
data: GuardedData<Private, Data>, //data: GuardedData<Private, Data>,
body: web::Json<IndexCreateRequest>, //body: web::Json<IndexCreateRequest>,
) -> Result<HttpResponse, ResponseError> { //) -> Result<HttpResponse, ResponseError> {
let body = body.into_inner(); //let body = body.into_inner();
let meta = data.create_index(body.uid, body.primary_key).await?; //let meta = data.create_index(body.uid, body.primary_key).await?;
Ok(HttpResponse::Created().json(meta)) //Ok(HttpResponse::Created().json(meta))
} //}
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase", deny_unknown_fields)] #[serde(rename_all = "camelCase", deny_unknown_fields)]
@ -97,13 +97,13 @@ pub async fn update_index(
Ok(HttpResponse::Ok().json(meta)) Ok(HttpResponse::Ok().json(meta))
} }
pub async fn delete_index( //pub async fn delete_index(
data: GuardedData<Private, Data>, //data: GuardedData<Private, Data>,
path: web::Path<IndexParam>, //path: web::Path<IndexParam>,
) -> Result<HttpResponse, ResponseError> { //) -> Result<HttpResponse, ResponseError> {
data.delete_index(path.index_uid.clone()).await?; //data.delete_index(path.index_uid.clone()).await?;
Ok(HttpResponse::NoContent().finish()) //Ok(HttpResponse::NoContent().finish())
} //}
pub async fn get_index_stats( pub async fn get_index_stats(
data: GuardedData<Private, Data>, data: GuardedData<Private, Data>,

View File

@ -1,185 +1,184 @@
use actix_web::{web, HttpResponse}; //use log::debug;
use log::debug;
use crate::extractors::authentication::{policies::*, GuardedData}; //use crate::extractors::authentication::{policies::*, GuardedData};
use crate::index::Settings; //use crate::index::Settings;
use crate::Data; //use crate::Data;
use crate::{error::ResponseError, index::Unchecked}; //use crate::error::ResponseError;
#[macro_export] //#[macro_export]
macro_rules! make_setting_route { //macro_rules! make_setting_route {
($route:literal, $type:ty, $attr:ident, $camelcase_attr:literal) => { //($route:literal, $type:ty, $attr:ident, $camelcase_attr:literal) => {
pub mod $attr { //pub mod $attr {
use log::debug; //use log::debug;
use actix_web::{web, HttpResponse, Resource}; //use actix_web::{web, HttpResponse, Resource};
use milli::update::Setting; //use milli::update::Setting;
use crate::data; //use crate::data;
use crate::error::ResponseError; //use crate::error::ResponseError;
use crate::index::Settings; //use crate::index::Settings;
use crate::extractors::authentication::{GuardedData, policies::*}; //use crate::extractors::authentication::{GuardedData, policies::*};
pub async fn delete( //pub async fn delete(
data: GuardedData<Private, data::Data>, //data: GuardedData<Private, data::Data>,
index_uid: web::Path<String>, //index_uid: web::Path<String>,
) -> Result<HttpResponse, ResponseError> { //) -> Result<HttpResponse, ResponseError> {
use crate::index::Settings; //use crate::index::Settings;
let settings = Settings { //let settings = Settings {
$attr: Setting::Reset, //$attr: Setting::Reset,
..Default::default() //..Default::default()
}; //};
let update_status = data.update_settings(index_uid.into_inner(), settings, false).await?; //let update_status = data.update_settings(index_uid.into_inner(), settings, false).await?;
debug!("returns: {:?}", update_status); //debug!("returns: {:?}", update_status);
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) //Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })))
} //}
pub async fn update( //pub async fn update(
data: GuardedData<Private, data::Data>, //data: GuardedData<Private, data::Data>,
index_uid: actix_web::web::Path<String>, //index_uid: actix_web::web::Path<String>,
body: actix_web::web::Json<Option<$type>>, //body: actix_web::web::Json<Option<$type>>,
) -> std::result::Result<HttpResponse, ResponseError> { //) -> std::result::Result<HttpResponse, ResponseError> {
let settings = Settings { //let settings = Settings {
$attr: match body.into_inner() { //$attr: match body.into_inner() {
Some(inner_body) => Setting::Set(inner_body), //Some(inner_body) => Setting::Set(inner_body),
None => Setting::Reset //None => Setting::Reset
}, //},
..Default::default() //..Default::default()
}; //};
let update_status = data.update_settings(index_uid.into_inner(), settings, true).await?; //let update_status = data.update_settings(index_uid.into_inner(), settings, true).await?;
debug!("returns: {:?}", update_status); //debug!("returns: {:?}", update_status);
Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) //Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })))
} //}
pub async fn get( //pub async fn get(
data: GuardedData<Private, data::Data>, //data: GuardedData<Private, data::Data>,
index_uid: actix_web::web::Path<String>, //index_uid: actix_web::web::Path<String>,
) -> std::result::Result<HttpResponse, ResponseError> { //) -> std::result::Result<HttpResponse, ResponseError> {
let settings = data.settings(index_uid.into_inner()).await?; //let settings = data.settings(index_uid.into_inner()).await?;
debug!("returns: {:?}", settings); //debug!("returns: {:?}", settings);
let mut json = serde_json::json!(&settings); //let mut json = serde_json::json!(&settings);
let val = json[$camelcase_attr].take(); //let val = json[$camelcase_attr].take();
Ok(HttpResponse::Ok().json(val)) //Ok(HttpResponse::Ok().json(val))
} //}
pub fn resources() -> Resource { //pub fn resources() -> Resource {
Resource::new($route) //Resource::new($route)
.route(web::get().to(get)) //.route(web::get().to(get))
.route(web::post().to(update)) //.route(web::post().to(update))
.route(web::delete().to(delete)) //.route(web::delete().to(delete))
} //}
} //}
}; //};
} //}
make_setting_route!( //make_setting_route!(
"/filterable-attributes", //"/filterable-attributes",
std::collections::BTreeSet<String>, //std::collections::BTreeSet<String>,
filterable_attributes, //filterable_attributes,
"filterableAttributes" //"filterableAttributes"
); //);
make_setting_route!( //make_setting_route!(
"/sortable-attributes", //"/sortable-attributes",
std::collections::BTreeSet<String>, //std::collections::BTreeSet<String>,
sortable_attributes, //sortable_attributes,
"sortableAttributes" //"sortableAttributes"
); //);
make_setting_route!( //make_setting_route!(
"/displayed-attributes", //"/displayed-attributes",
Vec<String>, //Vec<String>,
displayed_attributes, //displayed_attributes,
"displayedAttributes" //"displayedAttributes"
); //);
make_setting_route!( //make_setting_route!(
"/searchable-attributes", //"/searchable-attributes",
Vec<String>, //Vec<String>,
searchable_attributes, //searchable_attributes,
"searchableAttributes" //"searchableAttributes"
); //);
make_setting_route!( //make_setting_route!(
"/stop-words", //"/stop-words",
std::collections::BTreeSet<String>, //std::collections::BTreeSet<String>,
stop_words, //stop_words,
"stopWords" //"stopWords"
); //);
make_setting_route!( //make_setting_route!(
"/synonyms", //"/synonyms",
std::collections::BTreeMap<String, Vec<String>>, //std::collections::BTreeMap<String, Vec<String>>,
synonyms, //synonyms,
"synonyms" //"synonyms"
); //);
make_setting_route!( //make_setting_route!(
"/distinct-attribute", //"/distinct-attribute",
String, //String,
distinct_attribute, //distinct_attribute,
"distinctAttribute" //"distinctAttribute"
); //);
make_setting_route!("/ranking-rules", Vec<String>, ranking_rules, "rankingRules"); //make_setting_route!("/ranking-rules", Vec<String>, ranking_rules, "rankingRules");
macro_rules! generate_configure { //macro_rules! generate_configure {
($($mod:ident),*) => { //($($mod:ident),*) => {
pub fn configure(cfg: &mut web::ServiceConfig) { //pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service( //cfg.service(
web::resource("") //web::resource("")
.route(web::post().to(update_all)) ////.route(web::post().to(update_all))
.route(web::get().to(get_all)) //.route(web::get().to(get_all))
.route(web::delete().to(delete_all))) ////.route(web::delete().to(delete_all)))
$(.service($mod::resources()))*; //$(.service($mod::resources()))*;
} //}
}; //};
} //}
generate_configure!( //generate_configure!(
filterable_attributes, //filterable_attributes,
sortable_attributes, //sortable_attributes,
displayed_attributes, //displayed_attributes,
searchable_attributes, //searchable_attributes,
distinct_attribute, //distinct_attribute,
stop_words, //stop_words,
synonyms, //synonyms,
ranking_rules //ranking_rules
); //);
pub async fn update_all( //pub async fn update_all(
data: GuardedData<Private, Data>, //data: GuardedData<Private, Data>,
index_uid: web::Path<String>, //index_uid: web::Path<String>,
body: web::Json<Settings<Unchecked>>, //body: web::Json<Settings<Unchecked>>,
) -> Result<HttpResponse, ResponseError> { //) -> Result<HttpResponse, ResponseError> {
let settings = body.into_inner().check(); //let settings = body.into_inner().check();
let update_result = data //let update_result = data
.update_settings(index_uid.into_inner(), settings, true) //.update_settings(index_uid.into_inner(), settings, true)
.await?; //.await?;
let json = serde_json::json!({ "updateId": update_result.id() }); //let json = serde_json::json!({ "updateId": update_result.id() });
debug!("returns: {:?}", json); //debug!("returns: {:?}", json);
Ok(HttpResponse::Accepted().json(json)) //Ok(HttpResponse::Accepted().json(json))
} //}
pub async fn get_all( //pub async fn get_all(
data: GuardedData<Private, Data>, //data: GuardedData<Private, Data>,
index_uid: web::Path<String>, //index_uid: web::Path<String>,
) -> Result<HttpResponse, ResponseError> { //) -> Result<HttpResponse, ResponseError> {
let settings = data.settings(index_uid.into_inner()).await?; //let settings = data.settings(index_uid.into_inner()).await?;
debug!("returns: {:?}", settings); //debug!("returns: {:?}", settings);
Ok(HttpResponse::Ok().json(settings)) //Ok(HttpResponse::Ok().json(settings))
} //}
pub async fn delete_all( //pub async fn delete_all(
data: GuardedData<Private, Data>, //data: GuardedData<Private, Data>,
index_uid: web::Path<String>, //index_uid: web::Path<String>,
) -> Result<HttpResponse, ResponseError> { //) -> Result<HttpResponse, ResponseError> {
let settings = Settings::cleared(); //let settings = Settings::cleared();
let update_result = data //let update_result = data
.update_settings(index_uid.into_inner(), settings, false) //.update_settings(index_uid.into_inner(), settings, false)
.await?; //.await?;
let json = serde_json::json!({ "updateId": update_result.id() }); //let json = serde_json::json!({ "updateId": update_result.id() });
debug!("returns: {:?}", json); //debug!("returns: {:?}", json);
Ok(HttpResponse::Accepted().json(json)) //Ok(HttpResponse::Accepted().json(json))
} //}

View File

@ -8,7 +8,8 @@ use serde::{Deserialize, Serialize};
use crate::error::ResponseError; use crate::error::ResponseError;
use crate::extractors::authentication::{policies::*, GuardedData}; use crate::extractors::authentication::{policies::*, GuardedData};
use crate::index::{Settings, Unchecked}; use crate::index::{Settings, Unchecked};
use crate::index_controller::{UpdateMeta, UpdateResult, UpdateStatus}; use crate::index_controller::update_actor::RegisterUpdate;
use crate::index_controller::{UpdateResult, UpdateStatus};
use crate::Data; use crate::Data;
mod dump; mod dump;
@ -50,7 +51,7 @@ impl From<&UpdateStatus> for UpdateType {
fn from(other: &UpdateStatus) -> Self { fn from(other: &UpdateStatus) -> Self {
use milli::update::IndexDocumentsMethod::*; use milli::update::IndexDocumentsMethod::*;
match other.meta() { match other.meta() {
UpdateMeta::DocumentsAddition { method, .. } => { RegisterUpdate::DocumentAddition{ method, .. } => {
let number = match other { let number = match other {
UpdateStatus::Processed(processed) => match processed.success { UpdateStatus::Processed(processed) => match processed.success {
UpdateResult::DocumentsAddition(ref addition) => { UpdateResult::DocumentsAddition(ref addition) => {
@ -67,13 +68,13 @@ impl From<&UpdateStatus> for UpdateType {
_ => unreachable!(), _ => unreachable!(),
} }
} }
UpdateMeta::ClearDocuments => UpdateType::ClearAll, //UpdateMeta::ClearDocuments => UpdateType::ClearAll,
UpdateMeta::DeleteDocuments { ids } => UpdateType::DocumentsDeletion { //UpdateMeta::DeleteDocuments { ids } => UpdateType::DocumentsDeletion {
number: Some(ids.len()), //number: Some(ids.len()),
}, //},
UpdateMeta::Settings(settings) => UpdateType::Settings { //UpdateMeta::Settings(settings) => UpdateType::Settings {
settings: settings.clone(), //settings: settings.clone(),
}, //},
} }
} }
} }