review changes

This commit is contained in:
mpostma 2021-09-29 12:02:27 +02:00
parent 1f537e1b60
commit 8fa6502b16
12 changed files with 43 additions and 118 deletions

View File

@ -7,7 +7,6 @@ use actix_web::http::StatusCode;
use actix_web::HttpResponseBuilder; use actix_web::HttpResponseBuilder;
use aweb::error::{JsonPayloadError, QueryPayloadError}; use aweb::error::{JsonPayloadError, QueryPayloadError};
use meilisearch_error::{Code, ErrorCode}; use meilisearch_error::{Code, ErrorCode};
use meilisearch_lib::milli;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
@ -55,53 +54,6 @@ impl aweb::error::ResponseError for ResponseError {
} }
} }
#[derive(Debug)]
pub struct MilliError<'a>(pub &'a milli::Error);
impl Error for MilliError<'_> {}
impl fmt::Display for MilliError<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl ErrorCode for MilliError<'_> {
fn error_code(&self) -> Code {
use milli::UserError;
match self.0 {
milli::Error::InternalError(_) => Code::Internal,
milli::Error::IoError(_) => Code::Internal,
milli::Error::UserError(ref error) => {
match error {
// TODO: wait for spec for new error codes.
UserError::SerdeJson(_)
| UserError::MaxDatabaseSizeReached
| UserError::InvalidDocumentId { .. }
| UserError::InvalidStoreFile
| UserError::NoSpaceLeftOnDevice
| UserError::DocumentLimitReached => Code::Internal,
UserError::AttributeLimitReached => Code::MaxFieldsLimitExceeded,
UserError::InvalidFilter(_) => Code::Filter,
UserError::InvalidFilterAttribute(_) => Code::Filter,
UserError::MissingDocumentId { .. } => Code::MissingDocumentId,
UserError::MissingPrimaryKey => Code::MissingPrimaryKey,
UserError::PrimaryKeyCannotBeChanged => Code::PrimaryKeyAlreadyPresent,
UserError::PrimaryKeyCannotBeReset => Code::PrimaryKeyAlreadyPresent,
UserError::SortRankingRuleMissing => Code::Sort,
UserError::UnknownInternalDocumentId { .. } => Code::DocumentNotFound,
UserError::InvalidFacetsDistribution { .. } => Code::BadRequest,
UserError::InvalidGeoField { .. } => Code::InvalidGeoField,
UserError::InvalidSortableAttribute { .. } => Code::Sort,
UserError::SortError(_) => Code::Sort,
UserError::CriterionError(_) => Code::InvalidRankingRule,
}
}
}
}
}
impl fmt::Display for PayloadError { impl fmt::Display for PayloadError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {

View File

@ -96,8 +96,6 @@ pub fn configure_data(config: &mut web::ServiceConfig, data: MeiliSearch, opt: &
let http_payload_size_limit = opt.http_payload_size_limit.get_bytes() as usize; let http_payload_size_limit = opt.http_payload_size_limit.get_bytes() as usize;
config config
.app_data(data) .app_data(data)
// TODO!: Why are we passing the data with two different things?
//.app_data(data)
.app_data( .app_data(
web::JsonConfig::default() web::JsonConfig::default()
.limit(http_payload_size_limit) .limit(http_payload_size_limit)

View File

@ -1,9 +1,9 @@
use std::fs::File; use std::fs::{create_dir_all, File};
use std::io::Write; use std::io::Write;
use std::path::Path; use std::path::Path;
use flate2::{write::GzEncoder, Compression}; use flate2::{read::GzDecoder, write::GzEncoder, Compression};
use tar::Builder; use tar::{Archive, Builder};
pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> { pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
let mut f = File::create(dest)?; let mut f = File::create(dest)?;
@ -16,11 +16,11 @@ pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Resul
Ok(()) Ok(())
} }
//pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> { pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
//let f = File::open(&src)?; let f = File::open(&src)?;
//let gz = GzDecoder::new(f); let gz = GzDecoder::new(f);
//let mut ar = Archive::new(gz); let mut ar = Archive::new(gz);
//create_dir_all(&dest)?; create_dir_all(&dest)?;
//ar.unpack(&dest)?; ar.unpack(&dest)?;
//Ok(()) Ok(())
//} }

View File

@ -206,6 +206,10 @@ impl Index {
result result
})(); })();
if let Update::DocumentAddition { content_uuid, .. } = update.from.meta() {
let _ = self.update_file_store.delete(*content_uuid);
}
match result { match result {
Ok(result) => Ok(update.process(result)), Ok(result) => Ok(update.process(result)),
Err(e) => Err(update.fail(e)), Err(e) => Err(update.fail(e)),

View File

@ -15,8 +15,7 @@ use crate::document_formats::read_ndjson;
use crate::index::apply_settings_to_builder; use crate::index::apply_settings_to_builder;
use crate::index::update_handler::UpdateHandler; use crate::index::update_handler::UpdateHandler;
use crate::index_controller::index_resolver::uuid_store::HeedUuidStore; use crate::index_controller::index_resolver::uuid_store::HeedUuidStore;
use crate::index_controller::{self, IndexMetadata}; use crate::index_controller::{self, asc_ranking_rule, desc_ranking_rule, IndexMetadata};
use crate::index_controller::{asc_ranking_rule, desc_ranking_rule};
use crate::{index::Unchecked, options::IndexerOpts}; use crate::{index::Unchecked, options::IndexerOpts};
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]

View File

@ -16,6 +16,7 @@ pub use message::DumpMsg;
use super::index_resolver::HardStateIndexResolver; use super::index_resolver::HardStateIndexResolver;
use super::updates::UpdateSender; use super::updates::UpdateSender;
use crate::compression::{from_tar_gz, to_tar_gz};
use crate::index_controller::dump_actor::error::DumpActorError; use crate::index_controller::dump_actor::error::DumpActorError;
use crate::index_controller::updates::UpdateMsg; use crate::index_controller::updates::UpdateMsg;
use crate::options::IndexerOpts; use crate::options::IndexerOpts;
@ -111,7 +112,7 @@ pub fn load_dump(
let tmp_src = tempfile::tempdir()?; let tmp_src = tempfile::tempdir()?;
let tmp_src_path = tmp_src.path(); let tmp_src_path = tmp_src.path();
crate::from_tar_gz(&src_path, tmp_src_path)?; from_tar_gz(&src_path, tmp_src_path)?;
let meta_path = tmp_src_path.join(META_FILE_NAME); let meta_path = tmp_src_path.join(META_FILE_NAME);
let mut meta_file = File::open(&meta_path)?; let mut meta_file = File::open(&meta_path)?;
@ -172,7 +173,7 @@ impl DumpTask {
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> { let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new()?; let temp_dump_file = tempfile::NamedTempFile::new()?;
crate::to_tar_gz(temp_dump_path, temp_dump_file.path()) to_tar_gz(temp_dump_path, temp_dump_file.path())
.map_err(|e| DumpActorError::Internal(e.into()))?; .map_err(|e| DumpActorError::Internal(e.into()))?;
let dump_path = self.path.join(self.uid).with_extension("dump"); let dump_path = self.path.join(self.uid).with_extension("dump");

View File

@ -143,7 +143,7 @@ where
Some(index) => Ok(index), Some(index) => Ok(index),
None => { None => {
// For some reason we got a uuid to an unexisting index, we return an error, // For some reason we got a uuid to an unexisting index, we return an error,
// and remove the uuid from th uuid store. // and remove the uuid from the uuid store.
let _ = self.index_uuid_store.delete(name.clone()).await; let _ = self.index_uuid_store.delete(name.clone()).await;
Err(IndexResolverError::UnexistingIndex(name)) Err(IndexResolverError::UnexistingIndex(name))
} }

View File

@ -496,13 +496,9 @@ pub fn asc_ranking_rule(text: &str) -> Option<&str> {
.map(|(field, _)| field) .map(|(field, _)| field)
} }
/// Parses the v1 version of the Desc ranking rules `asc(price)`and returns the field name. /// Parses the v1 version of the Desc ranking rules `desc(price)`and returns the field name.
pub fn desc_ranking_rule(text: &str) -> Option<&str> { pub fn desc_ranking_rule(text: &str) -> Option<&str> {
text.split_once("desc(") text.split_once("desc(")
.and_then(|(_, tail)| tail.rsplit_once(")")) .and_then(|(_, tail)| tail.rsplit_once(")"))
.map(|(field, _)| field) .map(|(field, _)| field)
} }
fn update_files_path(path: impl AsRef<Path>) -> PathBuf {
path.as_ref().join("updates/updates_files")
}

View File

@ -8,6 +8,7 @@ use tokio::fs;
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
use tokio::time::sleep; use tokio::time::sleep;
use crate::compression::from_tar_gz;
use crate::index_controller::updates::UpdateMsg; use crate::index_controller::updates::UpdateMsg;
use super::index_resolver::HardStateIndexResolver; use super::index_resolver::HardStateIndexResolver;
@ -95,7 +96,7 @@ pub fn load_snapshot(
ignore_missing_snapshot: bool, ignore_missing_snapshot: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if !db_path.as_ref().exists() && snapshot_path.as_ref().exists() { if !db_path.as_ref().exists() && snapshot_path.as_ref().exists() {
match crate::from_tar_gz(snapshot_path, &db_path) { match from_tar_gz(snapshot_path, &db_path) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
Err(e) => { Err(e) => {
//clean created db folder //clean created db folder

View File

@ -141,8 +141,7 @@ impl UpdateFileStore {
let mut document_buffer = Map::new(); let mut document_buffer = Map::new();
// TODO: we need to find a way to do this more efficiently. (create a custom serializer // TODO: we need to find a way to do this more efficiently. (create a custom serializer
// for // for jsonl for example...)
// jsonl for example...)
while let Some((index, document)) = document_reader.next_document_with_index()? { while let Some((index, document)) = document_reader.next_document_with_index()? {
for (field_id, content) in document.iter() { for (field_id, content) in document.iter() {
if let Some(field_name) = index.get_by_left(&field_id) { if let Some(field_name) = index.get_by_left(&field_id) {
@ -164,4 +163,10 @@ impl UpdateFileStore {
pub fn get_size(&self, uuid: Uuid) -> Result<u64> { pub fn get_size(&self, uuid: Uuid) -> Result<u64> {
Ok(self.get_update(uuid)?.metadata()?.len()) Ok(self.get_update(uuid)?.metadata()?.len())
} }
pub fn delete(&self, uuid: Uuid) -> Result<()> {
let path = self.path.join(uuid.to_string());
std::fs::remove_file(path)?;
Ok(())
}
} }

View File

@ -1,7 +1,7 @@
mod codec; mod codec;
pub mod dump; pub mod dump;
use std::fs::{create_dir_all, remove_file}; use std::fs::create_dir_all;
use std::path::Path; use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
@ -29,7 +29,6 @@ use codec::*;
use super::error::Result; use super::error::Result;
use super::status::{Enqueued, Processing}; use super::status::{Enqueued, Processing};
use crate::index::Index; use crate::index::Index;
use crate::index_controller::update_files_path;
use crate::index_controller::updates::*; use crate::index_controller::updates::*;
use crate::EnvSizer; use crate::EnvSizer;
@ -269,8 +268,8 @@ impl UpdateStore {
Ok(meta) Ok(meta)
} }
// /// Push already processed update in the UpdateStore without triggering the notification /// Push already processed update in the UpdateStore without triggering the notification
// /// process. This is useful for the dumps. /// process. This is useful for the dumps.
pub fn register_raw_updates( pub fn register_raw_updates(
&self, &self,
wtxn: &mut heed::RwTxn, wtxn: &mut heed::RwTxn,
@ -436,19 +435,19 @@ impl UpdateStore {
pub fn delete_all(&self, index_uuid: Uuid) -> Result<()> { pub fn delete_all(&self, index_uuid: Uuid) -> Result<()> {
let mut txn = self.env.write_txn()?; let mut txn = self.env.write_txn()?;
// Contains all the content file paths that we need to be removed if the deletion was successful. // Contains all the content file paths that we need to be removed if the deletion was successful.
let uuids_to_remove = Vec::new(); let mut uuids_to_remove = Vec::new();
let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data(); let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data();
while let Some(Ok(((_, uuid, _), pending))) = pendings.next() { while let Some(Ok(((_, uuid, _), pending))) = pendings.next() {
if uuid == index_uuid { if uuid == index_uuid {
let mut _pending = pending.decode()?; let pending = pending.decode()?;
//if let Some(update_uuid) = pending.content.take() { if let Update::DocumentAddition { content_uuid, .. } = pending.meta() {
//uuids_to_remove.push(update_uuid); uuids_to_remove.push(*content_uuid);
//} }
// Invariant check: we can only delete the current entry when we don't hold //Invariant check: we can only delete the current entry when we don't hold
// references to it anymore. This must be done after we have retrieved its content. //references to it anymore. This must be done after we have retrieved its content.
unsafe { unsafe {
pendings.del_current()?; pendings.del_current()?;
} }
@ -485,12 +484,9 @@ impl UpdateStore {
// Finally, remove any outstanding update files. This must be done after waiting for the // Finally, remove any outstanding update files. This must be done after waiting for the
// last update to ensure that the update files are not deleted before the update needs // last update to ensure that the update files are not deleted before the update needs
// them. // them.
uuids_to_remove uuids_to_remove.iter().for_each(|uuid| {
.iter() let _ = self.update_file_store.delete(*uuid);
.map(|uuid: &Uuid| update_files_path(&self.path).join(uuid.to_string())) });
.for_each(|path| {
let _ = remove_file(path);
});
Ok(()) Ok(())
} }

View File

@ -28,30 +28,3 @@ impl EnvSizer for heed::Env {
.fold(0, |acc, m| acc + m.len()) .fold(0, |acc, m| acc + m.len())
} }
} }
use std::fs::{create_dir_all, File};
use std::io::Write;
use std::path::Path;
use flate2::{read::GzDecoder, write::GzEncoder, Compression};
use tar::{Archive, Builder};
pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
let mut f = File::create(dest)?;
let gz_encoder = GzEncoder::new(&mut f, Compression::default());
let mut tar_encoder = Builder::new(gz_encoder);
tar_encoder.append_dir_all(".", src)?;
let gz_encoder = tar_encoder.into_inner()?;
gz_encoder.finish()?;
f.flush()?;
Ok(())
}
pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
let f = File::open(&src)?;
let gz = GzDecoder::new(f);
let mut ar = Archive::new(gz);
create_dir_all(&dest)?;
ar.unpack(&dest)?;
Ok(())
}