diff --git a/meilisearch-lib/src/document_formats.rs b/meilisearch-lib/src/document_formats.rs deleted file mode 100644 index cfc200019..000000000 --- a/meilisearch-lib/src/document_formats.rs +++ /dev/null @@ -1,170 +0,0 @@ -use std::borrow::Borrow; -use std::fmt::{self, Debug, Display}; -use std::io::{self, BufReader, Read, Seek, Write}; - -use either::Either; -use meilisearch_types::error::{Code, ErrorCode}; -use meilisearch_types::internal_error; -use milli::documents::{DocumentsBatchBuilder, Error}; -use milli::Object; -use serde::Deserialize; -use serde_json::error::Category; - -type Result = std::result::Result; - -#[derive(Debug)] -pub enum PayloadType { - Ndjson, - Json, - Csv, -} - -impl fmt::Display for PayloadType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - PayloadType::Ndjson => f.write_str("ndjson"), - PayloadType::Json => f.write_str("json"), - PayloadType::Csv => f.write_str("csv"), - } - } -} - -#[derive(Debug)] -pub enum DocumentFormatError { - Internal(Box), - MalformedPayload(Error, PayloadType), -} - -impl Display for DocumentFormatError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Internal(e) => write!(f, "An internal error has occurred: `{}`.", e), - Self::MalformedPayload(me, b) => match me.borrow() { - Error::Json(se) => { - let mut message = match se.classify() { - Category::Data => { - "data are neither an object nor a list of objects".to_string() - } - _ => se.to_string(), - }; - - // https://github.com/meilisearch/meilisearch/issues/2107 - // The user input maybe insanely long. We need to truncate it. - let ellipsis = "..."; - let trim_input_prefix_len = 50; - let trim_input_suffix_len = 85; - - if message.len() - > trim_input_prefix_len + trim_input_suffix_len + ellipsis.len() - { - message.replace_range( - trim_input_prefix_len..message.len() - trim_input_suffix_len, - ellipsis, - ); - } - - write!( - f, - "The `{}` payload provided is malformed. `Couldn't serialize document value: {}`.", - b, message - ) - } - _ => write!(f, "The `{}` payload provided is malformed: `{}`.", b, me), - }, - } - } -} - -impl std::error::Error for DocumentFormatError {} - -impl From<(PayloadType, Error)> for DocumentFormatError { - fn from((ty, error): (PayloadType, Error)) -> Self { - match error { - Error::Io(e) => Self::Internal(Box::new(e)), - e => Self::MalformedPayload(e, ty), - } - } -} - -impl ErrorCode for DocumentFormatError { - fn error_code(&self) -> Code { - match self { - DocumentFormatError::Internal(_) => Code::Internal, - DocumentFormatError::MalformedPayload(_, _) => Code::MalformedPayload, - } - } -} - -internal_error!(DocumentFormatError: io::Error); - -/// Reads CSV from input and write an obkv batch to writer. -pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result { - let mut builder = DocumentsBatchBuilder::new(writer); - - let csv = csv::Reader::from_reader(input); - builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?; - - let count = builder.documents_count(); - let _ = builder - .into_inner() - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; - - Ok(count as usize) -} - -/// Reads JSON Lines from input and write an obkv batch to writer. -pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result { - let mut builder = DocumentsBatchBuilder::new(writer); - let reader = BufReader::new(input); - - for result in serde_json::Deserializer::from_reader(reader).into_iter() { - let object = result - .map_err(Error::Json) - .map_err(|e| (PayloadType::Ndjson, e))?; - builder - .append_json_object(&object) - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; - } - - let count = builder.documents_count(); - let _ = builder - .into_inner() - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; - - Ok(count as usize) -} - -/// Reads JSON from input and write an obkv batch to writer. -pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result { - let mut builder = DocumentsBatchBuilder::new(writer); - let reader = BufReader::new(input); - - #[derive(Deserialize, Debug)] - #[serde(transparent)] - struct ArrayOrSingleObject { - #[serde(with = "either::serde_untagged")] - inner: Either, Object>, - } - - let content: ArrayOrSingleObject = serde_json::from_reader(reader) - .map_err(Error::Json) - .map_err(|e| (PayloadType::Json, e))?; - - for object in content.inner.map_right(|o| vec![o]).into_inner() { - builder - .append_json_object(&object) - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; - } - - let count = builder.documents_count(); - let _ = builder - .into_inner() - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; - - Ok(count as usize) -} diff --git a/meilisearch-lib/src/dump/compat/v2.rs b/meilisearch-lib/src/dump/compat/v2.rs deleted file mode 100644 index ba3b8e3a6..000000000 --- a/meilisearch-lib/src/dump/compat/v2.rs +++ /dev/null @@ -1,152 +0,0 @@ -use anyhow::bail; -use meilisearch_types::error::Code; -use milli::update::IndexDocumentsMethod; -use serde::{Deserialize, Serialize}; -use time::OffsetDateTime; -use uuid::Uuid; - -use crate::index::{Settings, Unchecked}; - -#[derive(Serialize, Deserialize)] -pub struct UpdateEntry { - pub uuid: Uuid, - pub update: UpdateStatus, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum UpdateFormat { - Json, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct DocumentAdditionResult { - pub nb_documents: usize, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum UpdateResult { - DocumentsAddition(DocumentAdditionResult), - DocumentDeletion { deleted: u64 }, - Other, -} - -#[allow(clippy::large_enum_variant)] -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type")] -pub enum UpdateMeta { - DocumentsAddition { - method: IndexDocumentsMethod, - format: UpdateFormat, - primary_key: Option, - }, - ClearDocuments, - DeleteDocuments { - ids: Vec, - }, - Settings(Settings), -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Enqueued { - pub update_id: u64, - pub meta: UpdateMeta, - #[serde(with = "time::serde::rfc3339")] - pub enqueued_at: OffsetDateTime, - pub content: Option, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Processed { - pub success: UpdateResult, - #[serde(with = "time::serde::rfc3339")] - pub processed_at: OffsetDateTime, - #[serde(flatten)] - pub from: Processing, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Processing { - #[serde(flatten)] - pub from: Enqueued, - #[serde(with = "time::serde::rfc3339")] - pub started_processing_at: OffsetDateTime, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Aborted { - #[serde(flatten)] - pub from: Enqueued, - #[serde(with = "time::serde::rfc3339")] - pub aborted_at: OffsetDateTime, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Failed { - #[serde(flatten)] - pub from: Processing, - pub error: ResponseError, - #[serde(with = "time::serde::rfc3339")] - pub failed_at: OffsetDateTime, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(tag = "status", rename_all = "camelCase")] -pub enum UpdateStatus { - Processing(Processing), - Enqueued(Enqueued), - Processed(Processed), - Aborted(Aborted), - Failed(Failed), -} - -type StatusCode = (); - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct ResponseError { - #[serde(skip)] - pub code: StatusCode, - pub message: String, - pub error_code: String, - pub error_type: String, - pub error_link: String, -} - -pub fn error_code_from_str(s: &str) -> anyhow::Result { - let code = match s { - "index_creation_failed" => Code::CreateIndex, - "index_already_exists" => Code::IndexAlreadyExists, - "index_not_found" => Code::IndexNotFound, - "invalid_index_uid" => Code::InvalidIndexUid, - "invalid_state" => Code::InvalidState, - "missing_primary_key" => Code::MissingPrimaryKey, - "primary_key_already_present" => Code::PrimaryKeyAlreadyPresent, - "invalid_request" => Code::InvalidRankingRule, - "max_fields_limit_exceeded" => Code::MaxFieldsLimitExceeded, - "missing_document_id" => Code::MissingDocumentId, - "invalid_facet" => Code::Filter, - "invalid_filter" => Code::Filter, - "invalid_sort" => Code::Sort, - "bad_parameter" => Code::BadParameter, - "bad_request" => Code::BadRequest, - "document_not_found" => Code::DocumentNotFound, - "internal" => Code::Internal, - "invalid_geo_field" => Code::InvalidGeoField, - "invalid_token" => Code::InvalidToken, - "missing_authorization_header" => Code::MissingAuthorizationHeader, - "payload_too_large" => Code::PayloadTooLarge, - "unretrievable_document" => Code::RetrieveDocument, - "search_error" => Code::SearchDocuments, - "unsupported_media_type" => Code::UnsupportedMediaType, - "dump_already_in_progress" => Code::DumpAlreadyInProgress, - "dump_process_failed" => Code::DumpProcessFailed, - _ => bail!("unknown error code."), - }; - - Ok(code) -} diff --git a/meilisearch-lib/src/dump/loaders/v4.rs b/meilisearch-lib/src/dump/loaders/v4.rs deleted file mode 100644 index 44ec23517..000000000 --- a/meilisearch-lib/src/dump/loaders/v4.rs +++ /dev/null @@ -1,103 +0,0 @@ -use std::fs::{self, create_dir_all, File}; -use std::io::{BufReader, Write}; -use std::path::Path; - -use fs_extra::dir::{self, CopyOptions}; -use log::info; -use serde_json::{Deserializer, Map, Value}; -use tempfile::tempdir; -use uuid::Uuid; - -use crate::dump::{compat, Metadata}; -use crate::options::IndexerOpts; -use crate::tasks::task::Task; - -pub fn load_dump( - meta: Metadata, - src: impl AsRef, - dst: impl AsRef, - index_db_size: usize, - meta_env_size: usize, - indexing_options: &IndexerOpts, -) -> anyhow::Result<()> { - info!("Patching dump V4 to dump V5..."); - - let patched_dir = tempdir()?; - let options = CopyOptions::default(); - - // Indexes - dir::copy(src.as_ref().join("indexes"), &patched_dir, &options)?; - - // Index uuids - dir::copy(src.as_ref().join("index_uuids"), &patched_dir, &options)?; - - // Metadata - fs::copy( - src.as_ref().join("metadata.json"), - patched_dir.path().join("metadata.json"), - )?; - - // Updates - patch_updates(&src, &patched_dir)?; - - // Keys - patch_keys(&src, &patched_dir)?; - - super::v5::load_dump( - meta, - &patched_dir, - dst, - index_db_size, - meta_env_size, - indexing_options, - ) -} - -fn patch_updates(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { - let updates_path = src.as_ref().join("updates/data.jsonl"); - let output_updates_path = dst.as_ref().join("updates/data.jsonl"); - create_dir_all(output_updates_path.parent().unwrap())?; - let updates_file = File::open(updates_path)?; - let mut output_update_file = File::create(output_updates_path)?; - - serde_json::Deserializer::from_reader(updates_file) - .into_iter::() - .try_for_each(|task| -> anyhow::Result<()> { - let task: Task = task?.into(); - - serde_json::to_writer(&mut output_update_file, &task)?; - output_update_file.write_all(b"\n")?; - - Ok(()) - })?; - - output_update_file.flush()?; - - Ok(()) -} - -fn patch_keys(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { - let keys_file_src = src.as_ref().join("keys"); - - if !keys_file_src.exists() { - return Ok(()); - } - - fs::create_dir_all(&dst)?; - let keys_file_dst = dst.as_ref().join("keys"); - let mut writer = File::create(&keys_file_dst)?; - - let reader = BufReader::new(File::open(&keys_file_src)?); - for key in Deserializer::from_reader(reader).into_iter() { - let mut key: Map = key?; - - // generate a new uuid v4 and insert it in the key. - let uid = serde_json::to_value(Uuid::new_v4()).unwrap(); - key.insert("uid".to_string(), uid); - - serde_json::to_writer(&mut writer, &key)?; - writer.write_all(b"\n")?; - } - - Ok(()) -} diff --git a/meilisearch-lib/src/index/dump.rs b/meilisearch-lib/src/index/dump.rs deleted file mode 100644 index 9cc3c033f..000000000 --- a/meilisearch-lib/src/index/dump.rs +++ /dev/null @@ -1,161 +0,0 @@ -use std::fs::{create_dir_all, File}; -use std::io::{BufReader, Seek, SeekFrom, Write}; -use std::path::Path; - -use anyhow::Context; -use indexmap::IndexMap; -use milli::documents::DocumentsBatchReader; -use milli::heed::{EnvOpenOptions, RoTxn}; -use milli::update::{IndexDocumentsConfig, IndexerConfig}; -use serde::{Deserialize, Serialize}; - -use crate::document_formats::read_ndjson; -use crate::index::updates::apply_settings_to_builder; - -use super::error::Result; -use super::{index::Index, Settings, Unchecked}; - -#[derive(Serialize, Deserialize)] -struct DumpMeta { - settings: Settings, - primary_key: Option, -} - -const META_FILE_NAME: &str = "meta.json"; -const DATA_FILE_NAME: &str = "documents.jsonl"; - -impl Index { - pub fn dump(&self, path: impl AsRef) -> Result<()> { - // acquire write txn make sure any ongoing write is finished before we start. - let txn = self.write_txn()?; - let path = path.as_ref().join(format!("indexes/{}", self.uuid)); - - create_dir_all(&path)?; - - self.dump_documents(&txn, &path)?; - self.dump_meta(&txn, &path)?; - - Ok(()) - } - - fn dump_documents(&self, txn: &RoTxn, path: impl AsRef) -> Result<()> { - let document_file_path = path.as_ref().join(DATA_FILE_NAME); - let mut document_file = File::create(&document_file_path)?; - - let documents = self.all_documents(txn)?; - let fields_ids_map = self.fields_ids_map(txn)?; - - // dump documents - let mut json_map = IndexMap::new(); - for document in documents { - let (_, reader) = document?; - - for (fid, bytes) in reader.iter() { - if let Some(name) = fields_ids_map.name(fid) { - json_map.insert(name, serde_json::from_slice::(bytes)?); - } - } - - serde_json::to_writer(&mut document_file, &json_map)?; - document_file.write_all(b"\n")?; - - json_map.clear(); - } - - Ok(()) - } - - fn dump_meta(&self, txn: &RoTxn, path: impl AsRef) -> Result<()> { - let meta_file_path = path.as_ref().join(META_FILE_NAME); - let mut meta_file = File::create(&meta_file_path)?; - - let settings = self.settings_txn(txn)?.into_unchecked(); - let primary_key = self.primary_key(txn)?.map(String::from); - let meta = DumpMeta { - settings, - primary_key, - }; - - serde_json::to_writer(&mut meta_file, &meta)?; - - Ok(()) - } - - pub fn load_dump( - src: impl AsRef, - dst: impl AsRef, - size: usize, - indexer_config: &IndexerConfig, - ) -> anyhow::Result<()> { - let dir_name = src - .as_ref() - .file_name() - .with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?; - - let dst_dir_path = dst.as_ref().join("indexes").join(dir_name); - create_dir_all(&dst_dir_path)?; - - let meta_path = src.as_ref().join(META_FILE_NAME); - let meta_file = File::open(meta_path)?; - let DumpMeta { - settings, - primary_key, - } = serde_json::from_reader(meta_file)?; - let settings = settings.check(); - - let mut options = EnvOpenOptions::new(); - options.map_size(size); - options.max_readers(1024); - let index = milli::Index::new(options, &dst_dir_path)?; - - let mut txn = index.write_txn()?; - - // Apply settings first - let mut builder = milli::update::Settings::new(&mut txn, &index, indexer_config); - - if let Some(primary_key) = primary_key { - builder.set_primary_key(primary_key); - } - - apply_settings_to_builder(&settings, &mut builder); - - builder.execute(|_| ())?; - - let document_file_path = src.as_ref().join(DATA_FILE_NAME); - let reader = BufReader::new(File::open(&document_file_path)?); - - let mut tmp_doc_file = tempfile::tempfile()?; - - let empty = match read_ndjson(reader, &mut tmp_doc_file) { - // if there was no document in the file it's because the index was empty - Ok(0) => true, - Ok(_) => false, - Err(e) => return Err(e.into()), - }; - - if !empty { - tmp_doc_file.seek(SeekFrom::Start(0))?; - - let documents_reader = DocumentsBatchReader::from_reader(tmp_doc_file)?; - - //If the document file is empty, we don't perform the document addition, to prevent - //a primary key error to be thrown. - let config = IndexDocumentsConfig::default(); - let builder = milli::update::IndexDocuments::new( - &mut txn, - &index, - indexer_config, - config, - |_| (), - )?; - let (builder, user_error) = builder.add_documents(documents_reader)?; - user_error?; - builder.execute()?; - } - - txn.commit()?; - index.prepare_for_closing().wait(); - - Ok(()) - } -} diff --git a/meilisearch-lib/src/index/index.rs b/meilisearch-lib/src/index/index.rs deleted file mode 100644 index 3d6c47949..000000000 --- a/meilisearch-lib/src/index/index.rs +++ /dev/null @@ -1,333 +0,0 @@ -use std::collections::BTreeSet; -use std::fs::create_dir_all; -use std::marker::PhantomData; -use std::ops::Deref; -use std::path::Path; -use std::sync::Arc; - -use fst::IntoStreamer; -use milli::heed::{CompactionOption, EnvOpenOptions, RoTxn}; -use milli::update::{IndexerConfig, Setting}; -use milli::{obkv_to_json, FieldDistribution, DEFAULT_VALUES_PER_FACET}; -use serde::{Deserialize, Serialize}; -use serde_json::{Map, Value}; -use time::OffsetDateTime; -use uuid::Uuid; -use walkdir::WalkDir; - -use crate::index::search::DEFAULT_PAGINATION_MAX_TOTAL_HITS; - -use super::error::IndexError; -use super::error::Result; -use super::updates::{FacetingSettings, MinWordSizeTyposSetting, PaginationSettings, TypoSettings}; -use super::{Checked, Settings}; - -pub type Document = Map; - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct IndexMeta { - #[serde(with = "time::serde::rfc3339")] - pub created_at: OffsetDateTime, - #[serde(with = "time::serde::rfc3339")] - pub updated_at: OffsetDateTime, - pub primary_key: Option, -} - -impl IndexMeta { - pub fn new(index: &Index) -> Result { - let txn = index.read_txn()?; - Self::new_txn(index, &txn) - } - - pub fn new_txn(index: &Index, txn: &milli::heed::RoTxn) -> Result { - let created_at = index.created_at(txn)?; - let updated_at = index.updated_at(txn)?; - let primary_key = index.primary_key(txn)?.map(String::from); - Ok(Self { - created_at, - updated_at, - primary_key, - }) - } -} - -#[derive(Serialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct IndexStats { - #[serde(skip)] - pub size: u64, - pub number_of_documents: u64, - /// Whether the current index is performing an update. It is initially `None` when the - /// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is - /// later set to either true or false, we we retrieve the information from the `UpdateStore` - pub is_indexing: Option, - pub field_distribution: FieldDistribution, -} - -#[derive(Clone, derivative::Derivative)] -#[derivative(Debug)] -pub struct Index { - pub uuid: Uuid, - #[derivative(Debug = "ignore")] - pub inner: Arc, - #[derivative(Debug = "ignore")] - pub indexer_config: Arc, -} - -impl Deref for Index { - type Target = milli::Index; - - fn deref(&self) -> &Self::Target { - self.inner.as_ref() - } -} - -impl Index { - pub fn open( - path: impl AsRef, - size: usize, - uuid: Uuid, - update_handler: Arc, - ) -> Result { - log::debug!("opening index in {}", path.as_ref().display()); - create_dir_all(&path)?; - let mut options = EnvOpenOptions::new(); - options.map_size(size); - options.max_readers(1024); - let inner = Arc::new(milli::Index::new(options, &path)?); - Ok(Index { - inner, - uuid, - indexer_config: update_handler, - }) - } - - /// Asynchronously close the underlying index - pub fn close(self) { - self.inner.as_ref().clone().prepare_for_closing(); - } - - pub fn stats(&self) -> Result { - let rtxn = self.read_txn()?; - - Ok(IndexStats { - size: self.size(), - number_of_documents: self.number_of_documents(&rtxn)?, - is_indexing: None, - field_distribution: self.field_distribution(&rtxn)?, - }) - } - - pub fn meta(&self) -> Result { - IndexMeta::new(self) - } - pub fn settings(&self) -> Result> { - let txn = self.read_txn()?; - self.settings_txn(&txn) - } - - pub fn uuid(&self) -> Uuid { - self.uuid - } - - pub fn settings_txn(&self, txn: &RoTxn) -> Result> { - let displayed_attributes = self - .displayed_fields(txn)? - .map(|fields| fields.into_iter().map(String::from).collect()); - - let searchable_attributes = self - .user_defined_searchable_fields(txn)? - .map(|fields| fields.into_iter().map(String::from).collect()); - - let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect(); - - let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect(); - - let criteria = self - .criteria(txn)? - .into_iter() - .map(|c| c.to_string()) - .collect(); - - let stop_words = self - .stop_words(txn)? - .map(|stop_words| -> Result> { - Ok(stop_words.stream().into_strs()?.into_iter().collect()) - }) - .transpose()? - .unwrap_or_default(); - let distinct_field = self.distinct_field(txn)?.map(String::from); - - // in milli each word in the synonyms map were split on their separator. Since we lost - // this information we are going to put space between words. - let synonyms = self - .synonyms(txn)? - .iter() - .map(|(key, values)| { - ( - key.join(" "), - values.iter().map(|value| value.join(" ")).collect(), - ) - }) - .collect(); - - let min_typo_word_len = MinWordSizeTyposSetting { - one_typo: Setting::Set(self.min_word_len_one_typo(txn)?), - two_typos: Setting::Set(self.min_word_len_two_typos(txn)?), - }; - - let disabled_words = match self.exact_words(txn)? { - Some(fst) => fst.into_stream().into_strs()?.into_iter().collect(), - None => BTreeSet::new(), - }; - - let disabled_attributes = self - .exact_attributes(txn)? - .into_iter() - .map(String::from) - .collect(); - - let typo_tolerance = TypoSettings { - enabled: Setting::Set(self.authorize_typos(txn)?), - min_word_size_for_typos: Setting::Set(min_typo_word_len), - disable_on_words: Setting::Set(disabled_words), - disable_on_attributes: Setting::Set(disabled_attributes), - }; - - let faceting = FacetingSettings { - max_values_per_facet: Setting::Set( - self.max_values_per_facet(txn)? - .unwrap_or(DEFAULT_VALUES_PER_FACET), - ), - }; - - let pagination = PaginationSettings { - max_total_hits: Setting::Set( - self.pagination_max_total_hits(txn)? - .unwrap_or(DEFAULT_PAGINATION_MAX_TOTAL_HITS), - ), - }; - - Ok(Settings { - displayed_attributes: match displayed_attributes { - Some(attrs) => Setting::Set(attrs), - None => Setting::Reset, - }, - searchable_attributes: match searchable_attributes { - Some(attrs) => Setting::Set(attrs), - None => Setting::Reset, - }, - filterable_attributes: Setting::Set(filterable_attributes), - sortable_attributes: Setting::Set(sortable_attributes), - ranking_rules: Setting::Set(criteria), - stop_words: Setting::Set(stop_words), - distinct_attribute: match distinct_field { - Some(field) => Setting::Set(field), - None => Setting::Reset, - }, - synonyms: Setting::Set(synonyms), - typo_tolerance: Setting::Set(typo_tolerance), - faceting: Setting::Set(faceting), - pagination: Setting::Set(pagination), - _kind: PhantomData, - }) - } - - /// Return the total number of documents contained in the index + the selected documents. - pub fn retrieve_documents>( - &self, - offset: usize, - limit: usize, - attributes_to_retrieve: Option>, - ) -> Result<(u64, Vec)> { - let txn = self.read_txn()?; - - let fields_ids_map = self.fields_ids_map(&txn)?; - let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); - - let mut documents = Vec::new(); - for entry in self.all_documents(&txn)?.skip(offset).take(limit) { - let (_id, obkv) = entry?; - let document = obkv_to_json(&all_fields, &fields_ids_map, obkv)?; - let document = match &attributes_to_retrieve { - Some(attributes_to_retrieve) => permissive_json_pointer::select_values( - &document, - attributes_to_retrieve.iter().map(|s| s.as_ref()), - ), - None => document, - }; - documents.push(document); - } - - let number_of_documents = self.number_of_documents(&txn)?; - - Ok((number_of_documents, documents)) - } - - pub fn retrieve_document>( - &self, - doc_id: String, - attributes_to_retrieve: Option>, - ) -> Result { - let txn = self.read_txn()?; - - let fields_ids_map = self.fields_ids_map(&txn)?; - let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); - - let internal_id = self - .external_documents_ids(&txn)? - .get(doc_id.as_bytes()) - .ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?; - - let document = self - .documents(&txn, std::iter::once(internal_id))? - .into_iter() - .next() - .map(|(_, d)| d) - .ok_or(IndexError::DocumentNotFound(doc_id))?; - - let document = obkv_to_json(&all_fields, &fields_ids_map, document)?; - let document = match &attributes_to_retrieve { - Some(attributes_to_retrieve) => permissive_json_pointer::select_values( - &document, - attributes_to_retrieve.iter().map(|s| s.as_ref()), - ), - None => document, - }; - - Ok(document) - } - - pub fn size(&self) -> u64 { - WalkDir::new(self.path()) - .into_iter() - .filter_map(|entry| entry.ok()) - .filter_map(|entry| entry.metadata().ok()) - .filter(|metadata| metadata.is_file()) - .fold(0, |acc, m| acc + m.len()) - } - - pub fn snapshot(&self, path: impl AsRef) -> Result<()> { - let mut dst = path.as_ref().join(format!("indexes/{}/", self.uuid)); - create_dir_all(&dst)?; - dst.push("data.mdb"); - let _txn = self.write_txn()?; - self.inner.copy_to_path(dst, CompactionOption::Enabled)?; - Ok(()) - } -} - -/// When running tests, when a server instance is dropped, the environment is not actually closed, -/// leaving a lot of open file descriptors. -impl Drop for Index { - fn drop(&mut self) { - // When dropping the last instance of an index, we want to close the index - // Note that the close is actually performed only if all the instances a effectively - // dropped - - if Arc::strong_count(&self.inner) == 1 { - self.inner.as_ref().clone().prepare_for_closing(); - } - } -} diff --git a/meilisearch-lib/src/index_resolver/index_store.rs b/meilisearch-lib/src/index_resolver/index_store.rs deleted file mode 100644 index ea3c7125a..000000000 --- a/meilisearch-lib/src/index_resolver/index_store.rs +++ /dev/null @@ -1,108 +0,0 @@ -use std::collections::HashMap; -use std::convert::TryFrom; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use milli::update::IndexerConfig; -use tokio::fs; -use tokio::sync::RwLock; -use tokio::task::spawn_blocking; -use uuid::Uuid; - -use super::error::{IndexResolverError, Result}; -use crate::index::Index; -use crate::options::IndexerOpts; - -type AsyncMap = Arc>>; - -#[async_trait::async_trait] -#[cfg_attr(test, mockall::automock)] -pub trait IndexStore { - async fn create(&self, uuid: Uuid) -> Result; - async fn get(&self, uuid: Uuid) -> Result>; - async fn delete(&self, uuid: Uuid) -> Result>; -} - -pub struct MapIndexStore { - index_store: AsyncMap, - path: PathBuf, - index_size: usize, - indexer_config: Arc, -} - -impl MapIndexStore { - pub fn new( - path: impl AsRef, - index_size: usize, - indexer_opts: &IndexerOpts, - ) -> anyhow::Result { - let indexer_config = Arc::new(IndexerConfig::try_from(indexer_opts)?); - let path = path.as_ref().join("indexes/"); - let index_store = Arc::new(RwLock::new(HashMap::new())); - Ok(Self { - index_store, - path, - index_size, - indexer_config, - }) - } -} - -#[async_trait::async_trait] -impl IndexStore for MapIndexStore { - async fn create(&self, uuid: Uuid) -> Result { - // We need to keep the lock until we are sure the db file has been opened correctly, to - // ensure that another db is not created at the same time. - let mut lock = self.index_store.write().await; - - if let Some(index) = lock.get(&uuid) { - return Ok(index.clone()); - } - let path = self.path.join(format!("{}", uuid)); - if path.exists() { - return Err(IndexResolverError::UuidAlreadyExists(uuid)); - } - - let index_size = self.index_size; - let update_handler = self.indexer_config.clone(); - let index = spawn_blocking(move || -> Result { - let index = Index::open(path, index_size, uuid, update_handler)?; - Ok(index) - }) - .await??; - - lock.insert(uuid, index.clone()); - - Ok(index) - } - - async fn get(&self, uuid: Uuid) -> Result> { - let guard = self.index_store.read().await; - match guard.get(&uuid) { - Some(index) => Ok(Some(index.clone())), - None => { - // drop the guard here so we can perform the write after without deadlocking; - drop(guard); - let path = self.path.join(format!("{}", uuid)); - if !path.exists() { - return Ok(None); - } - - let index_size = self.index_size; - let update_handler = self.indexer_config.clone(); - let index = - spawn_blocking(move || Index::open(path, index_size, uuid, update_handler)) - .await??; - self.index_store.write().await.insert(uuid, index.clone()); - Ok(Some(index)) - } - } - } - - async fn delete(&self, uuid: Uuid) -> Result> { - let db_path = self.path.join(format!("{}", uuid)); - fs::remove_dir_all(db_path).await?; - let index = self.index_store.write().await.remove(&uuid); - Ok(index) - } -} diff --git a/meilisearch-lib/src/lib.rs b/meilisearch-lib/src/lib.rs deleted file mode 100644 index 3a16daeea..000000000 --- a/meilisearch-lib/src/lib.rs +++ /dev/null @@ -1,50 +0,0 @@ -#[macro_use] -pub mod error; -pub mod options; - -mod analytics; -mod document_formats; -// TODO: TAMO: reenable the dumps -#[cfg(todo)] -mod dump; -mod index_controller; -mod snapshot; - -use std::env::VarError; -use std::ffi::OsStr; -use std::path::Path; - -// TODO: TAMO: rename the MeiliSearch in Meilisearch -pub use index_controller::error::IndexControllerError; -pub use index_controller::Meilisearch as MeiliSearch; -pub use milli; -pub use milli::heed; - -mod compression; - -/// Check if a db is empty. It does not provide any information on the -/// validity of the data in it. -/// We consider a database as non empty when it's a non empty directory. -pub fn is_empty_db(db_path: impl AsRef) -> bool { - let db_path = db_path.as_ref(); - - if !db_path.exists() { - true - // if we encounter an error or if the db is a file we consider the db non empty - } else if let Ok(dir) = db_path.read_dir() { - dir.count() == 0 - } else { - true - } -} - -/// Checks if the key is defined in the environment variables. -/// If not, inserts it with the given value. -pub fn export_to_env_if_not_present(key: &str, value: T) -where - T: AsRef, -{ - if let Err(VarError::NotPresent) = std::env::var(key) { - std::env::set_var(key, value); - } -} diff --git a/meilisearch-lib/src/options.rs b/meilisearch-lib/src/options.rs deleted file mode 100644 index b84dd94a2..000000000 --- a/meilisearch-lib/src/options.rs +++ /dev/null @@ -1,205 +0,0 @@ -use crate::export_to_env_if_not_present; - -use core::fmt; -use std::{convert::TryFrom, num::ParseIntError, ops::Deref, str::FromStr}; - -use byte_unit::{Byte, ByteError}; -use clap::Parser; -use milli::update::IndexerConfig; -use serde::{Deserialize, Serialize}; -use sysinfo::{RefreshKind, System, SystemExt}; - -const MEILI_MAX_INDEXING_MEMORY: &str = "MEILI_MAX_INDEXING_MEMORY"; -const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS"; -const DISABLE_AUTO_BATCHING: &str = "DISABLE_AUTO_BATCHING"; -const DEFAULT_LOG_EVERY_N: usize = 100000; - -#[derive(Debug, Clone, Parser, Serialize, Deserialize)] -#[serde(rename_all = "snake_case", deny_unknown_fields)] -pub struct IndexerOpts { - /// Sets the amount of documents to skip before printing - /// a log regarding the indexing advancement. - #[serde(skip_serializing, default = "default_log_every_n")] - #[clap(long, default_value_t = default_log_every_n(), hide = true)] // 100k - pub log_every_n: usize, - - /// Grenad max number of chunks in bytes. - #[serde(skip_serializing)] - #[clap(long, hide = true)] - pub max_nb_chunks: Option, - - /// Sets the maximum amount of RAM Meilisearch can use when indexing. By default, Meilisearch - /// uses no more than two thirds of available memory. - #[clap(long, env = MEILI_MAX_INDEXING_MEMORY, default_value_t)] - #[serde(default)] - pub max_indexing_memory: MaxMemory, - - /// Sets the maximum number of threads Meilisearch can use during indexation. By default, the - /// indexer avoids using more than half of a machine's total processing units. This ensures - /// Meilisearch is always ready to perform searches, even while you are updating an index. - #[clap(long, env = MEILI_MAX_INDEXING_THREADS, default_value_t)] - #[serde(default)] - pub max_indexing_threads: MaxThreads, -} - -#[derive(Debug, Clone, Parser, Default, Serialize, Deserialize)] -#[serde(rename_all = "snake_case", deny_unknown_fields)] -pub struct SchedulerConfig { - /// Deactivates auto-batching when provided. - #[clap(long, env = DISABLE_AUTO_BATCHING)] - #[serde(default)] - pub disable_auto_batching: bool, -} - -impl IndexerOpts { - /// Exports the values to their corresponding env vars if they are not set. - pub fn export_to_env(self) { - let IndexerOpts { - max_indexing_memory, - max_indexing_threads, - log_every_n: _, - max_nb_chunks: _, - } = self; - if let Some(max_indexing_memory) = max_indexing_memory.0 { - export_to_env_if_not_present( - MEILI_MAX_INDEXING_MEMORY, - max_indexing_memory.to_string(), - ); - } - export_to_env_if_not_present( - MEILI_MAX_INDEXING_THREADS, - max_indexing_threads.0.to_string(), - ); - } -} - -impl TryFrom<&IndexerOpts> for IndexerConfig { - type Error = anyhow::Error; - - fn try_from(other: &IndexerOpts) -> Result { - let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(*other.max_indexing_threads) - .build()?; - - Ok(Self { - log_every_n: Some(other.log_every_n), - max_nb_chunks: other.max_nb_chunks, - max_memory: other.max_indexing_memory.map(|b| b.get_bytes() as usize), - thread_pool: Some(thread_pool), - max_positions_per_attributes: None, - ..Default::default() - }) - } -} - -impl Default for IndexerOpts { - fn default() -> Self { - Self { - log_every_n: 100_000, - max_nb_chunks: None, - max_indexing_memory: MaxMemory::default(), - max_indexing_threads: MaxThreads::default(), - } - } -} - -impl SchedulerConfig { - pub fn export_to_env(self) { - let SchedulerConfig { - disable_auto_batching, - } = self; - export_to_env_if_not_present(DISABLE_AUTO_BATCHING, disable_auto_batching.to_string()); - } -} - -/// A type used to detect the max memory available and use 2/3 of it. -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct MaxMemory(Option); - -impl FromStr for MaxMemory { - type Err = ByteError; - - fn from_str(s: &str) -> Result { - Byte::from_str(s).map(Some).map(MaxMemory) - } -} - -impl Default for MaxMemory { - fn default() -> MaxMemory { - MaxMemory( - total_memory_bytes() - .map(|bytes| bytes * 2 / 3) - .map(Byte::from_bytes), - ) - } -} - -impl fmt::Display for MaxMemory { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self.0 { - Some(memory) => write!(f, "{}", memory.get_appropriate_unit(true)), - None => f.write_str("unknown"), - } - } -} - -impl Deref for MaxMemory { - type Target = Option; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl MaxMemory { - pub fn unlimited() -> Self { - Self(None) - } -} - -/// Returns the total amount of bytes available or `None` if this system isn't supported. -fn total_memory_bytes() -> Option { - if System::IS_SUPPORTED { - let memory_kind = RefreshKind::new().with_memory(); - let mut system = System::new_with_specifics(memory_kind); - system.refresh_memory(); - Some(system.total_memory() * 1024) // KiB into bytes - } else { - None - } -} - -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct MaxThreads(usize); - -impl FromStr for MaxThreads { - type Err = ParseIntError; - - fn from_str(s: &str) -> Result { - usize::from_str(s).map(Self) - } -} - -impl Default for MaxThreads { - fn default() -> Self { - MaxThreads(num_cpus::get() / 2) - } -} - -impl fmt::Display for MaxThreads { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -impl Deref for MaxThreads { - type Target = usize; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -fn default_log_every_n() -> usize { - DEFAULT_LOG_EVERY_N -} diff --git a/meilisearch-lib/src/snapshot.rs b/meilisearch-lib/src/snapshot.rs deleted file mode 100644 index 5d68907c8..000000000 --- a/meilisearch-lib/src/snapshot.rs +++ /dev/null @@ -1,204 +0,0 @@ -use std::fs; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use std::time::Duration; - -use anyhow::bail; -use fs_extra::dir::{self, CopyOptions}; -use log::{info, trace}; -use meilisearch_auth::open_auth_store_env; -use milli::heed::CompactionOption; -use tokio::sync::RwLock; -use tokio::time::sleep; -use walkdir::WalkDir; - -use crate::compression::from_tar_gz; -use crate::index_controller::open_meta_env; -use crate::index_controller::versioning::VERSION_FILE_NAME; -use index_scheduler::IndexScheduler; - -pub struct SnapshotService { - pub(crate) db_path: PathBuf, - pub(crate) snapshot_period: Duration, - pub(crate) snapshot_path: PathBuf, - pub(crate) index_size: usize, - pub(crate) meta_env_size: usize, - pub(crate) scheduler: IndexScheduler, -} - -impl SnapshotService { - pub async fn run(self) { - info!( - "Snapshot scheduled every {}s.", - self.snapshot_period.as_secs() - ); - loop { - let snapshot_job = SnapshotJob { - dest_path: self.snapshot_path.clone(), - src_path: self.db_path.clone(), - meta_env_size: self.meta_env_size, - index_size: self.index_size, - }; - // TODO: TAMO: reenable the snapshots - // self.scheduler.write().await.schedule_snapshot(snapshot_job); - sleep(self.snapshot_period).await; - } - } -} - -pub fn load_snapshot( - db_path: impl AsRef, - snapshot_path: impl AsRef, - ignore_snapshot_if_db_exists: bool, - ignore_missing_snapshot: bool, -) -> anyhow::Result<()> { - let empty_db = crate::is_empty_db(&db_path); - let snapshot_path_exists = snapshot_path.as_ref().exists(); - - if empty_db && snapshot_path_exists { - match from_tar_gz(snapshot_path, &db_path) { - Ok(()) => Ok(()), - Err(e) => { - //clean created db folder - std::fs::remove_dir_all(&db_path)?; - Err(e) - } - } - } else if !empty_db && !ignore_snapshot_if_db_exists { - bail!( - "database already exists at {:?}, try to delete it or rename it", - db_path - .as_ref() - .canonicalize() - .unwrap_or_else(|_| db_path.as_ref().to_owned()) - ) - } else if !snapshot_path_exists && !ignore_missing_snapshot { - bail!("snapshot doesn't exist at {:?}", snapshot_path.as_ref()) - } else { - Ok(()) - } -} - -#[derive(Debug)] -pub struct SnapshotJob { - dest_path: PathBuf, - src_path: PathBuf, - - meta_env_size: usize, - index_size: usize, -} - -impl SnapshotJob { - pub async fn run(self) -> anyhow::Result<()> { - tokio::task::spawn_blocking(|| self.run_sync()).await??; - - Ok(()) - } - - fn run_sync(self) -> anyhow::Result<()> { - trace!("Performing snapshot."); - - let snapshot_dir = self.dest_path.clone(); - std::fs::create_dir_all(&snapshot_dir)?; - let temp_snapshot_dir = tempfile::tempdir()?; - let temp_snapshot_path = temp_snapshot_dir.path(); - - self.snapshot_version_file(temp_snapshot_path)?; - self.snapshot_meta_env(temp_snapshot_path)?; - self.snapshot_file_store(temp_snapshot_path)?; - self.snapshot_indexes(temp_snapshot_path)?; - self.snapshot_auth(temp_snapshot_path)?; - - let db_name = self - .src_path - .file_name() - .and_then(|n| n.to_str()) - .unwrap_or("data.ms") - .to_string(); - - let snapshot_path = self.dest_path.join(format!("{}.snapshot", db_name)); - let temp_snapshot_file = tempfile::NamedTempFile::new_in(&snapshot_dir)?; - let temp_snapshot_file_path = temp_snapshot_file.path().to_owned(); - crate::compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?; - let _file = temp_snapshot_file.persist(&snapshot_path)?; - - #[cfg(unix)] - { - use std::fs::Permissions; - use std::os::unix::fs::PermissionsExt; - - let perm = Permissions::from_mode(0o644); - _file.set_permissions(perm)?; - } - - trace!("Created snapshot in {:?}.", snapshot_path); - - Ok(()) - } - - fn snapshot_version_file(&self, path: &Path) -> anyhow::Result<()> { - let dst = path.join(VERSION_FILE_NAME); - let src = self.src_path.join(VERSION_FILE_NAME); - - fs::copy(src, dst)?; - - Ok(()) - } - - fn snapshot_meta_env(&self, path: &Path) -> anyhow::Result<()> { - let env = open_meta_env(&self.src_path, self.meta_env_size)?; - - let dst = path.join("data.mdb"); - env.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?; - - Ok(()) - } - - fn snapshot_file_store(&self, path: &Path) -> anyhow::Result<()> { - // for now we simply copy the updates/updates_files - // FIXME(marin): We may copy more files than necessary, if new files are added while we are - // performing the snapshop. We need a way to filter them out. - - let dst = path.join("updates"); - fs::create_dir_all(&dst)?; - let options = CopyOptions::default(); - dir::copy(self.src_path.join("updates/updates_files"), dst, &options)?; - - Ok(()) - } - - fn snapshot_indexes(&self, path: &Path) -> anyhow::Result<()> { - let indexes_path = self.src_path.join("indexes/"); - let dst = path.join("indexes/"); - - for entry in WalkDir::new(indexes_path).max_depth(1).into_iter().skip(1) { - let entry = entry?; - let name = entry.file_name(); - let dst = dst.join(name); - - std::fs::create_dir_all(&dst)?; - - let dst = dst.join("data.mdb"); - - let mut options = milli::heed::EnvOpenOptions::new(); - options.map_size(self.index_size); - options.max_readers(1024); - let index = milli::Index::new(options, entry.path())?; - index.copy_to_path(dst, CompactionOption::Enabled)?; - } - - Ok(()) - } - - fn snapshot_auth(&self, path: &Path) -> anyhow::Result<()> { - let auth_path = self.src_path.join("auth"); - let dst = path.join("auth"); - std::fs::create_dir_all(&dst)?; - let dst = dst.join("data.mdb"); - - let env = open_auth_store_env(&auth_path)?; - env.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?; - - Ok(()) - } -} diff --git a/meilisearch-lib/src/tasks/task_store/mod.rs b/meilisearch-lib/src/tasks/task_store/mod.rs deleted file mode 100644 index 55dfe17d3..000000000 --- a/meilisearch-lib/src/tasks/task_store/mod.rs +++ /dev/null @@ -1,420 +0,0 @@ -mod store; - -use std::collections::HashSet; -use std::io::{BufWriter, Write}; -use std::path::Path; -use std::sync::Arc; - -use log::debug; -use milli::heed::{Env, RwTxn}; -use time::OffsetDateTime; - -use super::batch::BatchContent; -use super::error::TaskError; -use super::scheduler::Processing; -use super::task::{Task, TaskContent, TaskId}; -use super::Result; -use crate::tasks::task::TaskEvent; -use crate::update_file_store::UpdateFileStore; - -#[cfg(test)] -pub use store::test::MockStore as Store; -#[cfg(not(test))] -pub use store::Store; - -type FilterFn = Box bool + Sync + Send + 'static>; - -/// Defines constraints to be applied when querying for Tasks from the store. -#[derive(Default)] -pub struct TaskFilter { - indexes: Option>, - filter_fn: Option, -} - -impl TaskFilter { - fn pass(&self, task: &Task) -> bool { - match task.index_uid() { - Some(index_uid) => self - .indexes - .as_ref() - .map_or(true, |indexes| indexes.contains(index_uid)), - None => false, - } - } - - fn filtered_indexes(&self) -> Option<&HashSet> { - self.indexes.as_ref() - } - - /// Adds an index to the filter, so the filter must match this index. - pub fn filter_index(&mut self, index: String) { - self.indexes - .get_or_insert_with(Default::default) - .insert(index); - } - - pub fn filter_fn(&mut self, f: FilterFn) { - self.filter_fn.replace(f); - } -} - -pub struct TaskStore { - store: Arc, -} - -impl Clone for TaskStore { - fn clone(&self) -> Self { - Self { - store: self.store.clone(), - } - } -} - -impl TaskStore { - pub fn new(env: Arc) -> Result { - let store = Arc::new(Store::new(env)?); - Ok(Self { store }) - } - - pub async fn register(&self, content: TaskContent) -> Result { - debug!("registering update: {:?}", content); - let store = self.store.clone(); - let task = tokio::task::spawn_blocking(move || -> Result { - let mut txn = store.wtxn()?; - let next_task_id = store.next_task_id(&mut txn)?; - let created_at = TaskEvent::Created(OffsetDateTime::now_utc()); - let task = Task { - id: next_task_id, - content, - events: vec![created_at], - }; - - store.put(&mut txn, &task)?; - txn.commit()?; - - Ok(task) - }) - .await??; - - Ok(task) - } - - pub fn register_raw_update(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> { - self.store.put(wtxn, task)?; - Ok(()) - } - - pub async fn get_task(&self, id: TaskId, filter: Option) -> Result { - let store = self.store.clone(); - let task = tokio::task::spawn_blocking(move || -> Result<_> { - let txn = store.rtxn()?; - let task = store.get(&txn, id)?; - Ok(task) - }) - .await?? - .ok_or(TaskError::UnexistingTask(id))?; - - match filter { - Some(filter) => filter - .pass(&task) - .then_some(task) - .ok_or(TaskError::UnexistingTask(id)), - None => Ok(task), - } - } - - /// This methods takes a `Processing` which contains the next task ids to process, and returns - /// the corresponding tasks along with the ownership to the passed processing. - /// - /// We need get_processing_tasks to take ownership over `Processing` because we need it to be - /// valid for 'static. - pub async fn get_processing_tasks( - &self, - processing: Processing, - ) -> Result<(Processing, BatchContent)> { - let store = self.store.clone(); - let tasks = tokio::task::spawn_blocking(move || -> Result<_> { - let txn = store.rtxn()?; - - let content = match processing { - Processing::DocumentAdditions(ref ids) => { - let mut tasks = Vec::new(); - - for id in ids.iter() { - let task = store - .get(&txn, *id)? - .ok_or(TaskError::UnexistingTask(*id))?; - tasks.push(task); - } - BatchContent::DocumentsAdditionBatch(tasks) - } - Processing::IndexUpdate(id) => { - let task = store.get(&txn, id)?.ok_or(TaskError::UnexistingTask(id))?; - BatchContent::IndexUpdate(task) - } - Processing::Dump(id) => { - let task = store.get(&txn, id)?.ok_or(TaskError::UnexistingTask(id))?; - debug_assert!(matches!(task.content, TaskContent::Dump { .. })); - BatchContent::Dump(task) - } - Processing::Nothing => BatchContent::Empty, - }; - - Ok((processing, content)) - }) - .await??; - - Ok(tasks) - } - - pub async fn update_tasks(&self, tasks: Vec) -> Result> { - let store = self.store.clone(); - - let tasks = tokio::task::spawn_blocking(move || -> Result<_> { - let mut txn = store.wtxn()?; - - for task in &tasks { - store.put(&mut txn, task)?; - } - - txn.commit()?; - - Ok(tasks) - }) - .await??; - - Ok(tasks) - } - - pub async fn fetch_unfinished_tasks(&self, offset: Option) -> Result> { - let store = self.store.clone(); - - tokio::task::spawn_blocking(move || { - let txn = store.rtxn()?; - let tasks = store.fetch_unfinished_tasks(&txn, offset)?; - Ok(tasks) - }) - .await? - } - - pub async fn list_tasks( - &self, - offset: Option, - filter: Option, - limit: Option, - ) -> Result> { - let store = self.store.clone(); - - tokio::task::spawn_blocking(move || { - let txn = store.rtxn()?; - let tasks = store.list_tasks(&txn, offset, filter, limit)?; - Ok(tasks) - }) - .await? - } - - pub async fn dump( - env: Arc, - dir_path: impl AsRef, - update_file_store: UpdateFileStore, - ) -> Result<()> { - let store = Self::new(env)?; - let update_dir = dir_path.as_ref().join("updates"); - let updates_file = update_dir.join("data.jsonl"); - let tasks = store.list_tasks(None, None, None).await?; - - let dir_path = dir_path.as_ref().to_path_buf(); - tokio::task::spawn_blocking(move || -> Result<()> { - std::fs::create_dir(&update_dir)?; - let updates_file = std::fs::File::create(updates_file)?; - let mut updates_file = BufWriter::new(updates_file); - - for task in tasks { - serde_json::to_writer(&mut updates_file, &task)?; - updates_file.write_all(b"\n")?; - - if !task.is_finished() { - if let Some(content_uuid) = task.get_content_uuid() { - update_file_store.dump(content_uuid, &dir_path)?; - } - } - } - updates_file.flush()?; - Ok(()) - }) - .await??; - - Ok(()) - } - - pub fn load_dump(src: impl AsRef, env: Arc) -> anyhow::Result<()> { - // create a dummy update field store, since it is not needed right now. - let store = Self::new(env.clone())?; - - let src_update_path = src.as_ref().join("updates"); - let update_data = std::fs::File::open(&src_update_path.join("data.jsonl"))?; - let update_data = std::io::BufReader::new(update_data); - - let stream = serde_json::Deserializer::from_reader(update_data).into_iter::(); - - let mut wtxn = env.write_txn()?; - for entry in stream { - store.register_raw_update(&mut wtxn, &entry?)?; - } - wtxn.commit()?; - - Ok(()) - } -} - -#[cfg(test)] -pub mod test { - use crate::tasks::{scheduler::Processing, task_store::store::test::tmp_env}; - - use super::*; - - use meilisearch_types::index_uid::IndexUid; - use nelson::Mocker; - use proptest::{ - strategy::Strategy, - test_runner::{Config, TestRunner}, - }; - - pub enum MockTaskStore { - Real(TaskStore), - Mock(Arc), - } - - impl Clone for MockTaskStore { - fn clone(&self) -> Self { - match self { - Self::Real(x) => Self::Real(x.clone()), - Self::Mock(x) => Self::Mock(x.clone()), - } - } - } - - impl MockTaskStore { - pub fn new(env: Arc) -> Result { - Ok(Self::Real(TaskStore::new(env)?)) - } - - pub async fn dump( - env: Arc, - path: impl AsRef, - update_file_store: UpdateFileStore, - ) -> Result<()> { - TaskStore::dump(env, path, update_file_store).await - } - - pub fn mock(mocker: Mocker) -> Self { - Self::Mock(Arc::new(mocker)) - } - - pub async fn update_tasks(&self, tasks: Vec) -> Result> { - match self { - Self::Real(s) => s.update_tasks(tasks).await, - Self::Mock(m) => unsafe { - m.get::<_, Result>>("update_tasks").call(tasks) - }, - } - } - - pub async fn get_task(&self, id: TaskId, filter: Option) -> Result { - match self { - Self::Real(s) => s.get_task(id, filter).await, - Self::Mock(m) => unsafe { m.get::<_, Result>("get_task").call((id, filter)) }, - } - } - - pub async fn get_processing_tasks( - &self, - tasks: Processing, - ) -> Result<(Processing, BatchContent)> { - match self { - Self::Real(s) => s.get_processing_tasks(tasks).await, - Self::Mock(m) => unsafe { m.get("get_pending_task").call(tasks) }, - } - } - - pub async fn fetch_unfinished_tasks(&self, from: Option) -> Result> { - match self { - Self::Real(s) => s.fetch_unfinished_tasks(from).await, - Self::Mock(m) => unsafe { m.get("fetch_unfinished_tasks").call(from) }, - } - } - - pub async fn list_tasks( - &self, - from: Option, - filter: Option, - limit: Option, - ) -> Result> { - match self { - Self::Real(s) => s.list_tasks(from, filter, limit).await, - Self::Mock(m) => unsafe { m.get("list_tasks").call((from, filter, limit)) }, - } - } - - pub async fn register(&self, content: TaskContent) -> Result { - match self { - Self::Real(s) => s.register(content).await, - Self::Mock(_m) => todo!(), - } - } - - pub fn register_raw_update(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> { - match self { - Self::Real(s) => s.register_raw_update(wtxn, task), - Self::Mock(_m) => todo!(), - } - } - - pub fn load_dump(path: impl AsRef, env: Arc) -> anyhow::Result<()> { - TaskStore::load_dump(path, env) - } - } - - #[test] - fn test_increment_task_id() { - let tmp = tmp_env(); - let store = Store::new(tmp.env()).unwrap(); - - let mut txn = store.wtxn().unwrap(); - assert_eq!(store.next_task_id(&mut txn).unwrap(), 0); - txn.abort().unwrap(); - - let gen_task = |id: TaskId| Task { - id, - content: TaskContent::IndexCreation { - primary_key: None, - index_uid: IndexUid::new_unchecked("test"), - }, - events: Vec::new(), - }; - - let mut runner = TestRunner::new(Config::default()); - runner - .run(&(0..100u32).prop_map(gen_task), |task| { - let mut txn = store.wtxn().unwrap(); - let previous_id = store.next_task_id(&mut txn).unwrap(); - - store.put(&mut txn, &task).unwrap(); - - let next_id = store.next_task_id(&mut txn).unwrap(); - - // if we put a task whose task_id is less than the next_id, then the next_id remains - // unchanged, otherwise it becomes task.id + 1 - if task.id < previous_id { - assert_eq!(next_id, previous_id) - } else { - assert_eq!(next_id, task.id + 1); - } - - txn.commit().unwrap(); - - Ok(()) - }) - .unwrap(); - } -} diff --git a/meilisearch-lib/src/tasks/task_store/store.rs b/meilisearch-lib/src/tasks/task_store/store.rs deleted file mode 100644 index 32b20aeb8..000000000 --- a/meilisearch-lib/src/tasks/task_store/store.rs +++ /dev/null @@ -1,377 +0,0 @@ -#[allow(clippy::upper_case_acronyms)] - -type BEU32 = milli::heed::zerocopy::U32; - -const INDEX_UIDS_TASK_IDS: &str = "index-uids-task-ids"; -const TASKS: &str = "tasks"; - -use std::collections::HashSet; -use std::ops::Bound::{Excluded, Unbounded}; -use std::result::Result as StdResult; -use std::sync::Arc; - -use milli::heed::types::{OwnedType, SerdeJson, Str}; -use milli::heed::{Database, Env, RoTxn, RwTxn}; -use milli::heed_codec::RoaringBitmapCodec; -use roaring::RoaringBitmap; - -use crate::tasks::task::{Task, TaskId}; - -use super::super::Result; -use super::TaskFilter; - -pub struct Store { - env: Arc, - /// Maps an index uid to the set of tasks ids associated to it. - index_uid_task_ids: Database, - tasks: Database, SerdeJson>, -} - -impl Drop for Store { - fn drop(&mut self) { - if Arc::strong_count(&self.env) == 1 { - self.env.as_ref().clone().prepare_for_closing(); - } - } -} - -impl Store { - /// Create a new store from the specified `Path`. - /// Be really cautious when calling this function, the returned `Store` may - /// be in an invalid state, with dangling processing tasks. - /// You want to patch all un-finished tasks and put them in your pending - /// queue with the `reset_and_return_unfinished_update` method. - pub fn new(env: Arc) -> Result { - let index_uid_task_ids = env.create_database(Some(INDEX_UIDS_TASK_IDS))?; - let tasks = env.create_database(Some(TASKS))?; - - Ok(Self { - env, - index_uid_task_ids, - tasks, - }) - } - - pub fn wtxn(&self) -> Result { - Ok(self.env.write_txn()?) - } - - pub fn rtxn(&self) -> Result { - Ok(self.env.read_txn()?) - } - - /// Returns the id for the next task. - /// - /// The required `mut txn` acts as a reservation system. It guarantees that as long as you commit - /// the task to the store in the same transaction, no one else will have this task id. - pub fn next_task_id(&self, txn: &mut RwTxn) -> Result { - let id = self - .tasks - .lazily_decode_data() - .last(txn)? - .map(|(id, _)| id.get() + 1) - .unwrap_or(0); - Ok(id) - } - - pub fn put(&self, txn: &mut RwTxn, task: &Task) -> Result<()> { - self.tasks.put(txn, &BEU32::new(task.id), task)?; - // only add the task to the indexes index if it has an index_uid - if let Some(index_uid) = task.index_uid() { - let mut tasks_set = self - .index_uid_task_ids - .get(txn, index_uid)? - .unwrap_or_default(); - - tasks_set.insert(task.id); - - self.index_uid_task_ids.put(txn, index_uid, &tasks_set)?; - } - - Ok(()) - } - - pub fn get(&self, txn: &RoTxn, id: TaskId) -> Result> { - let task = self.tasks.get(txn, &BEU32::new(id))?; - Ok(task) - } - - /// Returns the unfinished tasks starting from the given taskId in ascending order. - pub fn fetch_unfinished_tasks(&self, txn: &RoTxn, from: Option) -> Result> { - // We must NEVER re-enqueue an already processed task! It's content uuid would point to an unexisting file. - // - // TODO(marin): This may create some latency when the first batch lazy loads the pending updates. - let from = from.unwrap_or_default(); - - let result: StdResult, milli::heed::Error> = self - .tasks - .range(txn, &(BEU32::new(from)..))? - .map(|r| r.map(|(_, t)| t)) - .filter(|result| result.as_ref().map_or(true, |t| !t.is_finished())) - .collect(); - - result.map_err(Into::into) - } - - /// Returns all the tasks starting from the given taskId and going in descending order. - pub fn list_tasks( - &self, - txn: &RoTxn, - from: Option, - filter: Option, - limit: Option, - ) -> Result> { - let from = match from { - Some(from) => from, - None => self.tasks.last(txn)?.map_or(0, |(id, _)| id.get()), - }; - - let filter_fn = |task: &Task| { - filter - .as_ref() - .and_then(|f| f.filter_fn.as_ref()) - .map_or(true, |f| f(task)) - }; - - let result: Result> = match filter.as_ref().and_then(|f| f.filtered_indexes()) { - Some(indexes) => self - .compute_candidates(txn, indexes, from)? - .filter(|result| result.as_ref().map_or(true, filter_fn)) - .take(limit.unwrap_or(usize::MAX)) - .collect(), - None => self - .tasks - .rev_range(txn, &(..=BEU32::new(from)))? - .map(|r| r.map(|(_, t)| t).map_err(Into::into)) - .filter(|result| result.as_ref().map_or(true, filter_fn)) - .take(limit.unwrap_or(usize::MAX)) - .collect(), - }; - - result.map_err(Into::into) - } - - fn compute_candidates<'a>( - &'a self, - txn: &'a RoTxn, - indexes: &HashSet, - from: TaskId, - ) -> Result> + 'a> { - let mut candidates = RoaringBitmap::new(); - - for index_uid in indexes { - if let Some(tasks_set) = self.index_uid_task_ids.get(txn, index_uid)? { - candidates |= tasks_set; - } - } - - candidates.remove_range((Excluded(from), Unbounded)); - - let iter = candidates - .into_iter() - .rev() - .filter_map(|id| self.get(txn, id).transpose()); - - Ok(iter) - } -} - -#[cfg(test)] -pub mod test { - use itertools::Itertools; - use meilisearch_types::index_uid::IndexUid; - use milli::heed::EnvOpenOptions; - use nelson::Mocker; - use tempfile::TempDir; - - use crate::tasks::task::TaskContent; - - use super::*; - - /// TODO: use this mock to test the task store properly. - #[allow(dead_code)] - pub enum MockStore { - Real(Store), - Fake(Mocker), - } - - pub struct TmpEnv(TempDir, Arc); - - impl TmpEnv { - pub fn env(&self) -> Arc { - self.1.clone() - } - } - - pub fn tmp_env() -> TmpEnv { - let tmp = tempfile::tempdir().unwrap(); - - let mut options = EnvOpenOptions::new(); - options.map_size(4096 * 100000); - options.max_dbs(1000); - let env = Arc::new(options.open(tmp.path()).unwrap()); - - TmpEnv(tmp, env) - } - - impl MockStore { - pub fn new(env: Arc) -> Result { - Ok(Self::Real(Store::new(env)?)) - } - - pub fn wtxn(&self) -> Result { - match self { - MockStore::Real(index) => index.wtxn(), - MockStore::Fake(_) => todo!(), - } - } - - pub fn rtxn(&self) -> Result { - match self { - MockStore::Real(index) => index.rtxn(), - MockStore::Fake(_) => todo!(), - } - } - - pub fn next_task_id(&self, txn: &mut RwTxn) -> Result { - match self { - MockStore::Real(index) => index.next_task_id(txn), - MockStore::Fake(_) => todo!(), - } - } - - pub fn put(&self, txn: &mut RwTxn, task: &Task) -> Result<()> { - match self { - MockStore::Real(index) => index.put(txn, task), - MockStore::Fake(_) => todo!(), - } - } - - pub fn get(&self, txn: &RoTxn, id: TaskId) -> Result> { - match self { - MockStore::Real(index) => index.get(txn, id), - MockStore::Fake(_) => todo!(), - } - } - - pub fn fetch_unfinished_tasks( - &self, - txn: &RoTxn, - from: Option, - ) -> Result> { - match self { - MockStore::Real(index) => index.fetch_unfinished_tasks(txn, from), - MockStore::Fake(_) => todo!(), - } - } - - pub fn list_tasks( - &self, - txn: &RoTxn, - from: Option, - filter: Option, - limit: Option, - ) -> Result> { - match self { - MockStore::Real(index) => index.list_tasks(txn, from, filter, limit), - MockStore::Fake(_) => todo!(), - } - } - } - - #[test] - fn test_ordered_filtered_updates() { - let tmp = tmp_env(); - let store = Store::new(tmp.env()).unwrap(); - - let tasks = (0..100) - .map(|_| Task { - id: rand::random(), - content: TaskContent::IndexDeletion { - index_uid: IndexUid::new_unchecked("test"), - }, - events: vec![], - }) - .collect::>(); - - let mut txn = store.env.write_txn().unwrap(); - tasks - .iter() - .try_for_each(|t| store.put(&mut txn, t)) - .unwrap(); - - let mut filter = TaskFilter::default(); - filter.filter_index("test".into()); - - let tasks = store.list_tasks(&txn, None, Some(filter), None).unwrap(); - - assert!(tasks - .iter() - .map(|t| t.id) - .tuple_windows() - .all(|(a, b)| a > b)); - } - - #[test] - fn test_filter_same_index_prefix() { - let tmp = tmp_env(); - let store = Store::new(tmp.env()).unwrap(); - - let task_1 = Task { - id: 1, - content: TaskContent::IndexDeletion { - index_uid: IndexUid::new_unchecked("test"), - }, - events: vec![], - }; - - let task_2 = Task { - id: 0, - content: TaskContent::IndexDeletion { - index_uid: IndexUid::new_unchecked("test1"), - }, - events: vec![], - }; - - let mut txn = store.wtxn().unwrap(); - store.put(&mut txn, &task_1).unwrap(); - store.put(&mut txn, &task_2).unwrap(); - - let mut filter = TaskFilter::default(); - filter.filter_index("test".into()); - - let tasks = store.list_tasks(&txn, None, Some(filter), None).unwrap(); - - txn.abort().unwrap(); - assert_eq!(tasks.len(), 1); - assert_eq!(tasks.first().as_ref().unwrap().index_uid().unwrap(), "test"); - - // same thing but invert the ids - let task_1 = Task { - id: 0, - content: TaskContent::IndexDeletion { - index_uid: IndexUid::new_unchecked("test"), - }, - events: vec![], - }; - let task_2 = Task { - id: 1, - content: TaskContent::IndexDeletion { - index_uid: IndexUid::new_unchecked("test1"), - }, - events: vec![], - }; - - let mut txn = store.wtxn().unwrap(); - store.put(&mut txn, &task_1).unwrap(); - store.put(&mut txn, &task_2).unwrap(); - - let mut filter = TaskFilter::default(); - filter.filter_index("test".into()); - - let tasks = store.list_tasks(&txn, None, Some(filter), None).unwrap(); - - assert_eq!(tasks.len(), 1); - assert_eq!(tasks.first().as_ref().unwrap().index_uid().unwrap(), "test"); - } -}