From d8b8e04ad14c56c1969e7d0dabc8b7a731426af1 Mon Sep 17 00:00:00 2001 From: Irevoire Date: Wed, 7 Sep 2022 20:08:07 +0200 Subject: [PATCH] wip porting the index back in the scheduler --- Cargo.lock | 10 + index-scheduler/Cargo.toml | 10 + index-scheduler/src/batch.rs | 20 +- index-scheduler/src/error.rs | 6 +- index-scheduler/src/index/dump.rs | 160 ++++++ index-scheduler/src/index/error.rs | 61 ++ index-scheduler/src/index/index.rs | 326 +++++++++++ index-scheduler/src/index/mod.rs | 249 ++++++++ index-scheduler/src/index/search.rs | 688 +++++++++++++++++++++++ index-scheduler/src/index/updates.rs | 562 ++++++++++++++++++ index-scheduler/src/lib.rs | 126 ++++- index-scheduler/src/task.rs | 8 +- index-scheduler/src/update_file_store.rs | 258 +++++++++ index-scheduler/src/utils.rs | 30 + 14 files changed, 2493 insertions(+), 21 deletions(-) create mode 100644 index-scheduler/src/index/dump.rs create mode 100644 index-scheduler/src/index/error.rs create mode 100644 index-scheduler/src/index/index.rs create mode 100644 index-scheduler/src/index/mod.rs create mode 100644 index-scheduler/src/index/search.rs create mode 100644 index-scheduler/src/index/updates.rs create mode 100644 index-scheduler/src/update_file_store.rs diff --git a/Cargo.lock b/Cargo.lock index 3ae4e2ac8..5b3d81828 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1660,11 +1660,21 @@ version = "0.1.0" dependencies = [ "anyhow", "bincode", + "derivative", + "either", + "fst", + "indexmap", + "log", + "meilisearch-types", "milli 0.33.0", + "permissive-json-pointer", + "regex", "roaring 0.9.0", "serde", + "serde_json", "thiserror", "time", + "uuid", ] [[package]] diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index 057e59324..4de98cdb0 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -8,8 +8,18 @@ edition = "2021" [dependencies] anyhow = "1.0.64" bincode = "1.3.3" +derivative = "2.2.0" +fst = "0.4.7" +indexmap = { version = "1.8.0", features = ["serde-1"] } +log = "0.4.14" milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.33.0" } +permissive-json-pointer = { path = "../permissive-json-pointer" } +meilisearch-types = { path = "../meilisearch-types" } +regex = "1.5.5" +either = { version = "1.6.1", features = ["serde"] } roaring = "0.9.0" serde = { version = "1.0.136", features = ["derive"] } +serde_json = { version = "1.0.85", features = ["preserve_order"] } thiserror = "1.0.30" time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] } +uuid = { version = "1.1.2", features = ["serde", "v4"] } diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 3e97a8fd0..7ef056362 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1,12 +1,12 @@ use crate::{ task::{KindWithContent, Status}, - Error, IndexScheduler, Result, + Error, IndexScheduler, Result, TaskId, }; use milli::heed::RoTxn; use crate::{task::Kind, Task}; -pub enum Batch { +pub(crate) enum Batch { Cancel(Task), Snapshot(Vec), Dump(Vec), @@ -21,7 +21,7 @@ impl IndexScheduler { /// 2. We get the *next* snapshot to process. /// 3. We get the *next* dump to process. /// 4. We get the *next* tasks to process for a specific index. - fn get_next_batch(&self, rtxn: &RoTxn) -> Result { + pub(crate) fn get_next_batch(&self, rtxn: &RoTxn) -> Result { let enqueued = &self.get_status(rtxn, Status::Enqueued)?; let to_cancel = self.get_kind(rtxn, Kind::CancelTask)? & enqueued; @@ -108,3 +108,17 @@ impl IndexScheduler { }) } } + +impl Batch { + pub fn task_ids(&self) -> impl IntoIterator + '_ { + match self { + Batch::Cancel(task) | Batch::One(task) => { + Box::new(std::iter::once(task.uid)) as Box> + } + Batch::Snapshot(tasks) | Batch::Dump(tasks) | Batch::Contiguous { tasks, .. } => { + Box::new(tasks.iter().map(|task| task.uid)) as Box> + } + Batch::Empty => Box::new(std::iter::empty()) as Box>, + } + } +} diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index 5b467456b..212a9b04d 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -3,8 +3,10 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum Error { - #[error("Index not found")] - IndexNotFound, + #[error("Index `{}` not found", .0)] + IndexNotFound(String), + #[error("Index `{}` already exists", .0)] + IndexAlreadyExists(String), #[error("Corrupted task queue.")] CorruptedTaskQueue, #[error(transparent)] diff --git a/index-scheduler/src/index/dump.rs b/index-scheduler/src/index/dump.rs new file mode 100644 index 000000000..6a41fa7a0 --- /dev/null +++ b/index-scheduler/src/index/dump.rs @@ -0,0 +1,160 @@ +use std::fs::{create_dir_all, File}; +use std::io::{BufReader, Seek, SeekFrom, Write}; +use std::path::Path; + +use anyhow::Context; +use indexmap::IndexMap; +use milli::documents::DocumentsBatchReader; +use milli::heed::{EnvOpenOptions, RoTxn}; +use milli::update::{IndexDocumentsConfig, IndexerConfig}; +use serde::{Deserialize, Serialize}; + +use crate::document_formats::read_ndjson; +use crate::index::updates::apply_settings_to_builder; + +use super::error::Result; +use super::{index::Index, Settings, Unchecked}; + +#[derive(Serialize, Deserialize)] +struct DumpMeta { + settings: Settings, + primary_key: Option, +} + +const META_FILE_NAME: &str = "meta.json"; +const DATA_FILE_NAME: &str = "documents.jsonl"; + +impl Index { + pub fn dump(&self, path: impl AsRef) -> Result<()> { + // acquire write txn make sure any ongoing write is finished before we start. + let txn = self.write_txn()?; + let path = path.as_ref().join(format!("indexes/{}", self.uuid)); + + create_dir_all(&path)?; + + self.dump_documents(&txn, &path)?; + self.dump_meta(&txn, &path)?; + + Ok(()) + } + + fn dump_documents(&self, txn: &RoTxn, path: impl AsRef) -> Result<()> { + let document_file_path = path.as_ref().join(DATA_FILE_NAME); + let mut document_file = File::create(&document_file_path)?; + + let documents = self.all_documents(txn)?; + let fields_ids_map = self.fields_ids_map(txn)?; + + // dump documents + let mut json_map = IndexMap::new(); + for document in documents { + let (_, reader) = document?; + + for (fid, bytes) in reader.iter() { + if let Some(name) = fields_ids_map.name(fid) { + json_map.insert(name, serde_json::from_slice::(bytes)?); + } + } + + serde_json::to_writer(&mut document_file, &json_map)?; + document_file.write_all(b"\n")?; + + json_map.clear(); + } + + Ok(()) + } + + fn dump_meta(&self, txn: &RoTxn, path: impl AsRef) -> Result<()> { + let meta_file_path = path.as_ref().join(META_FILE_NAME); + let mut meta_file = File::create(&meta_file_path)?; + + let settings = self.settings_txn(txn)?.into_unchecked(); + let primary_key = self.primary_key(txn)?.map(String::from); + let meta = DumpMeta { + settings, + primary_key, + }; + + serde_json::to_writer(&mut meta_file, &meta)?; + + Ok(()) + } + + pub fn load_dump( + src: impl AsRef, + dst: impl AsRef, + size: usize, + indexer_config: &IndexerConfig, + ) -> anyhow::Result<()> { + let dir_name = src + .as_ref() + .file_name() + .with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?; + + let dst_dir_path = dst.as_ref().join("indexes").join(dir_name); + create_dir_all(&dst_dir_path)?; + + let meta_path = src.as_ref().join(META_FILE_NAME); + let meta_file = File::open(meta_path)?; + let DumpMeta { + settings, + primary_key, + } = serde_json::from_reader(meta_file)?; + let settings = settings.check(); + + let mut options = EnvOpenOptions::new(); + options.map_size(size); + let index = milli::Index::new(options, &dst_dir_path)?; + + let mut txn = index.write_txn()?; + + // Apply settings first + let mut builder = milli::update::Settings::new(&mut txn, &index, indexer_config); + + if let Some(primary_key) = primary_key { + builder.set_primary_key(primary_key); + } + + apply_settings_to_builder(&settings, &mut builder); + + builder.execute(|_| ())?; + + let document_file_path = src.as_ref().join(DATA_FILE_NAME); + let reader = BufReader::new(File::open(&document_file_path)?); + + let mut tmp_doc_file = tempfile::tempfile()?; + + let empty = match read_ndjson(reader, &mut tmp_doc_file) { + // if there was no document in the file it's because the index was empty + Ok(0) => true, + Ok(_) => false, + Err(e) => return Err(e.into()), + }; + + if !empty { + tmp_doc_file.seek(SeekFrom::Start(0))?; + + let documents_reader = DocumentsBatchReader::from_reader(tmp_doc_file)?; + + //If the document file is empty, we don't perform the document addition, to prevent + //a primary key error to be thrown. + let config = IndexDocumentsConfig::default(); + let builder = milli::update::IndexDocuments::new( + &mut txn, + &index, + indexer_config, + config, + |_| (), + )?; + let (builder, user_error) = builder.add_documents(documents_reader)?; + user_error?; + builder.execute()?; + } + + txn.commit()?; + index.prepare_for_closing().wait(); + + Ok(()) + } +} diff --git a/index-scheduler/src/index/error.rs b/index-scheduler/src/index/error.rs new file mode 100644 index 000000000..f795ceaa4 --- /dev/null +++ b/index-scheduler/src/index/error.rs @@ -0,0 +1,61 @@ +use std::error::Error; + +use meilisearch_types::error::{Code, ErrorCode}; +use meilisearch_types::internal_error; +use serde_json::Value; + +use crate::{error::MilliError, update_file_store}; + +pub type Result = std::result::Result; + +#[derive(Debug, thiserror::Error)] +pub enum IndexError { + #[error("An internal error has occurred. `{0}`.")] + Internal(Box), + #[error("Document `{0}` not found.")] + DocumentNotFound(String), + #[error("{0}")] + Facet(#[from] FacetError), + #[error("{0}")] + Milli(#[from] milli::Error), +} + +internal_error!( + IndexError: std::io::Error, + milli::heed::Error, + fst::Error, + serde_json::Error, + update_file_store::UpdateFileStoreError, + milli::documents::Error +); + +impl ErrorCode for IndexError { + fn error_code(&self) -> Code { + match self { + IndexError::Internal(_) => Code::Internal, + IndexError::DocumentNotFound(_) => Code::DocumentNotFound, + IndexError::Facet(e) => e.error_code(), + IndexError::Milli(e) => MilliError(e).error_code(), + } + } +} + +impl From for IndexError { + fn from(error: milli::UserError) -> IndexError { + IndexError::Milli(error.into()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum FacetError { + #[error("Invalid syntax for the filter parameter: `expected {}, found: {1}`.", .0.join(", "))] + InvalidExpression(&'static [&'static str], Value), +} + +impl ErrorCode for FacetError { + fn error_code(&self) -> Code { + match self { + FacetError::InvalidExpression(_, _) => Code::Filter, + } + } +} diff --git a/index-scheduler/src/index/index.rs b/index-scheduler/src/index/index.rs new file mode 100644 index 000000000..f32c842ed --- /dev/null +++ b/index-scheduler/src/index/index.rs @@ -0,0 +1,326 @@ +use std::collections::BTreeSet; +use std::fs::create_dir_all; +use std::marker::PhantomData; +use std::ops::Deref; +use std::path::Path; +use std::sync::Arc; + +use fst::IntoStreamer; +use milli::heed::{CompactionOption, EnvOpenOptions, RoTxn}; +use milli::update::{IndexerConfig, Setting}; +use milli::{obkv_to_json, FieldDistribution, DEFAULT_VALUES_PER_FACET}; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::index::search::DEFAULT_PAGINATION_MAX_TOTAL_HITS; + +use super::error::IndexError; +use super::error::Result; +use super::updates::{FacetingSettings, MinWordSizeTyposSetting, PaginationSettings, TypoSettings}; +use super::{Checked, Settings}; + +pub type Document = Map; + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct IndexMeta { + #[serde(with = "time::serde::rfc3339")] + pub created_at: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + pub updated_at: OffsetDateTime, + pub primary_key: Option, +} + +impl IndexMeta { + pub fn new(index: &Index) -> Result { + let txn = index.read_txn()?; + Self::new_txn(index, &txn) + } + + pub fn new_txn(index: &Index, txn: &milli::heed::RoTxn) -> Result { + let created_at = index.created_at(txn)?; + let updated_at = index.updated_at(txn)?; + let primary_key = index.primary_key(txn)?.map(String::from); + Ok(Self { + created_at, + updated_at, + primary_key, + }) + } +} + +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct IndexStats { + #[serde(skip)] + pub size: u64, + pub number_of_documents: u64, + /// Whether the current index is performing an update. It is initially `None` when the + /// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is + /// later set to either true or false, we we retrieve the information from the `UpdateStore` + pub is_indexing: Option, + pub field_distribution: FieldDistribution, +} + +#[derive(Clone, derivative::Derivative)] +#[derivative(Debug)] +pub struct Index { + pub name: String, + #[derivative(Debug = "ignore")] + pub inner: Arc, + #[derivative(Debug = "ignore")] + pub indexer_config: Arc, +} + +impl Deref for Index { + type Target = milli::Index; + + fn deref(&self) -> &Self::Target { + self.inner.as_ref() + } +} + +impl Index { + pub fn open( + path: impl AsRef, + name: String, + size: usize, + update_handler: Arc, + ) -> Result { + log::debug!("opening index in {}", path.as_ref().display()); + create_dir_all(&path)?; + let mut options = EnvOpenOptions::new(); + options.map_size(size); + let inner = Arc::new(milli::Index::new(options, &path)?); + Ok(Index { + name, + inner, + indexer_config: update_handler, + }) + } + + /// Asynchronously close the underlying index + pub fn close(self) { + self.inner.as_ref().clone().prepare_for_closing(); + } + + pub fn stats(&self) -> Result { + let rtxn = self.read_txn()?; + + Ok(IndexStats { + size: self.size()?, + number_of_documents: self.number_of_documents(&rtxn)?, + is_indexing: None, + field_distribution: self.field_distribution(&rtxn)?, + }) + } + + pub fn meta(&self) -> Result { + IndexMeta::new(self) + } + pub fn settings(&self) -> Result> { + let txn = self.read_txn()?; + self.settings_txn(&txn) + } + + pub fn name(&self) -> String { + self.name + } + + pub fn settings_txn(&self, txn: &RoTxn) -> Result> { + let displayed_attributes = self + .displayed_fields(txn)? + .map(|fields| fields.into_iter().map(String::from).collect()); + + let searchable_attributes = self + .user_defined_searchable_fields(txn)? + .map(|fields| fields.into_iter().map(String::from).collect()); + + let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect(); + + let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect(); + + let criteria = self + .criteria(txn)? + .into_iter() + .map(|c| c.to_string()) + .collect(); + + let stop_words = self + .stop_words(txn)? + .map(|stop_words| -> Result> { + Ok(stop_words.stream().into_strs()?.into_iter().collect()) + }) + .transpose()? + .unwrap_or_default(); + let distinct_field = self.distinct_field(txn)?.map(String::from); + + // in milli each word in the synonyms map were split on their separator. Since we lost + // this information we are going to put space between words. + let synonyms = self + .synonyms(txn)? + .iter() + .map(|(key, values)| { + ( + key.join(" "), + values.iter().map(|value| value.join(" ")).collect(), + ) + }) + .collect(); + + let min_typo_word_len = MinWordSizeTyposSetting { + one_typo: Setting::Set(self.min_word_len_one_typo(txn)?), + two_typos: Setting::Set(self.min_word_len_two_typos(txn)?), + }; + + let disabled_words = match self.exact_words(txn)? { + Some(fst) => fst.into_stream().into_strs()?.into_iter().collect(), + None => BTreeSet::new(), + }; + + let disabled_attributes = self + .exact_attributes(txn)? + .into_iter() + .map(String::from) + .collect(); + + let typo_tolerance = TypoSettings { + enabled: Setting::Set(self.authorize_typos(txn)?), + min_word_size_for_typos: Setting::Set(min_typo_word_len), + disable_on_words: Setting::Set(disabled_words), + disable_on_attributes: Setting::Set(disabled_attributes), + }; + + let faceting = FacetingSettings { + max_values_per_facet: Setting::Set( + self.max_values_per_facet(txn)? + .unwrap_or(DEFAULT_VALUES_PER_FACET), + ), + }; + + let pagination = PaginationSettings { + max_total_hits: Setting::Set( + self.pagination_max_total_hits(txn)? + .unwrap_or(DEFAULT_PAGINATION_MAX_TOTAL_HITS), + ), + }; + + Ok(Settings { + displayed_attributes: match displayed_attributes { + Some(attrs) => Setting::Set(attrs), + None => Setting::Reset, + }, + searchable_attributes: match searchable_attributes { + Some(attrs) => Setting::Set(attrs), + None => Setting::Reset, + }, + filterable_attributes: Setting::Set(filterable_attributes), + sortable_attributes: Setting::Set(sortable_attributes), + ranking_rules: Setting::Set(criteria), + stop_words: Setting::Set(stop_words), + distinct_attribute: match distinct_field { + Some(field) => Setting::Set(field), + None => Setting::Reset, + }, + synonyms: Setting::Set(synonyms), + typo_tolerance: Setting::Set(typo_tolerance), + faceting: Setting::Set(faceting), + pagination: Setting::Set(pagination), + _kind: PhantomData, + }) + } + + /// Return the total number of documents contained in the index + the selected documents. + pub fn retrieve_documents>( + &self, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> Result<(u64, Vec)> { + let txn = self.read_txn()?; + + let fields_ids_map = self.fields_ids_map(&txn)?; + let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); + + let mut documents = Vec::new(); + for entry in self.all_documents(&txn)?.skip(offset).take(limit) { + let (_id, obkv) = entry?; + let document = obkv_to_json(&all_fields, &fields_ids_map, obkv)?; + let document = match &attributes_to_retrieve { + Some(attributes_to_retrieve) => permissive_json_pointer::select_values( + &document, + attributes_to_retrieve.iter().map(|s| s.as_ref()), + ), + None => document, + }; + documents.push(document); + } + + let number_of_documents = self.number_of_documents(&txn)?; + + Ok((number_of_documents, documents)) + } + + pub fn retrieve_document>( + &self, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> Result { + let txn = self.read_txn()?; + + let fields_ids_map = self.fields_ids_map(&txn)?; + let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); + + let internal_id = self + .external_documents_ids(&txn)? + .get(doc_id.as_bytes()) + .ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?; + + let document = self + .documents(&txn, std::iter::once(internal_id))? + .into_iter() + .next() + .map(|(_, d)| d) + .ok_or(IndexError::DocumentNotFound(doc_id))?; + + let document = obkv_to_json(&all_fields, &fields_ids_map, document)?; + let document = match &attributes_to_retrieve { + Some(attributes_to_retrieve) => permissive_json_pointer::select_values( + &document, + attributes_to_retrieve.iter().map(|s| s.as_ref()), + ), + None => document, + }; + + Ok(document) + } + + pub fn size(&self) -> Result { + self.inner.on_disk_size() + } + + pub fn snapshot(&self, path: impl AsRef) -> Result<()> { + let mut dst = path.as_ref().join(format!("indexes/{}/", self.name)); + create_dir_all(&dst)?; + dst.push("data.mdb"); + let _txn = self.write_txn()?; + self.inner.copy_to_path(dst, CompactionOption::Enabled)?; + Ok(()) + } +} + +/// When running tests, when a server instance is dropped, the environment is not actually closed, +/// leaving a lot of open file descriptors. +impl Drop for Index { + fn drop(&mut self) { + // When dropping the last instance of an index, we want to close the index + // Note that the close is actually performed only if all the instances a effectively + // dropped + + if Arc::strong_count(&self.inner) == 1 { + self.inner.as_ref().clone().prepare_for_closing(); + } + } +} diff --git a/index-scheduler/src/index/mod.rs b/index-scheduler/src/index/mod.rs new file mode 100644 index 000000000..98c25366d --- /dev/null +++ b/index-scheduler/src/index/mod.rs @@ -0,0 +1,249 @@ +pub use search::{ + MatchingStrategy, SearchQuery, SearchResult, DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, + DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, +}; +pub use updates::{apply_settings_to_builder, Checked, Facets, Settings, Unchecked}; + +mod dump; +pub mod error; +mod search; +pub mod updates; + +#[allow(clippy::module_inception)] +mod index; + +pub use index::{Document, IndexMeta, IndexStats}; + +#[cfg(not(test))] +pub use index::Index; + +#[cfg(test)] +pub use test::MockIndex as Index; + +/// The index::test module provides means of mocking an index instance. I can be used throughout the +/// code for unit testing, in places where an index would normally be used. +#[cfg(test)] +pub mod test { + use std::path::{Path, PathBuf}; + use std::sync::Arc; + + use milli::update::{ + DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod, IndexerConfig, + }; + use nelson::Mocker; + use uuid::Uuid; + + use super::error::Result; + use super::index::Index; + use super::Document; + use super::{Checked, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings}; + use crate::update_file_store::UpdateFileStore; + + #[derive(Clone)] + pub enum MockIndex { + Real(Index), + Mock(Arc), + } + + impl MockIndex { + pub fn mock(mocker: Mocker) -> Self { + Self::Mock(Arc::new(mocker)) + } + + pub fn open( + path: impl AsRef, + size: usize, + uuid: Uuid, + update_handler: Arc, + ) -> Result { + let index = Index::open(path, size, uuid, update_handler)?; + Ok(Self::Real(index)) + } + + pub fn load_dump( + src: impl AsRef, + dst: impl AsRef, + size: usize, + update_handler: &IndexerConfig, + ) -> anyhow::Result<()> { + Index::load_dump(src, dst, size, update_handler) + } + + pub fn uuid(&self) -> Uuid { + match self { + MockIndex::Real(index) => index.uuid(), + MockIndex::Mock(m) => unsafe { m.get("uuid").call(()) }, + } + } + + pub fn stats(&self) -> Result { + match self { + MockIndex::Real(index) => index.stats(), + MockIndex::Mock(m) => unsafe { m.get("stats").call(()) }, + } + } + + pub fn meta(&self) -> Result { + match self { + MockIndex::Real(index) => index.meta(), + MockIndex::Mock(_) => todo!(), + } + } + pub fn settings(&self) -> Result> { + match self { + MockIndex::Real(index) => index.settings(), + MockIndex::Mock(_) => todo!(), + } + } + + pub fn retrieve_documents>( + &self, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> Result<(u64, Vec)> { + match self { + MockIndex::Real(index) => { + index.retrieve_documents(offset, limit, attributes_to_retrieve) + } + MockIndex::Mock(_) => todo!(), + } + } + + pub fn retrieve_document>( + &self, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> Result { + match self { + MockIndex::Real(index) => index.retrieve_document(doc_id, attributes_to_retrieve), + MockIndex::Mock(_) => todo!(), + } + } + + pub fn size(&self) -> u64 { + match self { + MockIndex::Real(index) => index.size(), + MockIndex::Mock(_) => todo!(), + } + } + + pub fn snapshot(&self, path: impl AsRef) -> Result<()> { + match self { + MockIndex::Real(index) => index.snapshot(path), + MockIndex::Mock(m) => unsafe { m.get("snapshot").call(path.as_ref()) }, + } + } + + pub fn close(self) { + match self { + MockIndex::Real(index) => index.close(), + MockIndex::Mock(m) => unsafe { m.get("close").call(()) }, + } + } + + pub fn perform_search(&self, query: SearchQuery) -> Result { + match self { + MockIndex::Real(index) => index.perform_search(query), + MockIndex::Mock(m) => unsafe { m.get("perform_search").call(query) }, + } + } + + pub fn dump(&self, path: impl AsRef) -> Result<()> { + match self { + MockIndex::Real(index) => index.dump(path), + MockIndex::Mock(m) => unsafe { m.get("dump").call(path.as_ref()) }, + } + } + + pub fn update_documents( + &self, + method: IndexDocumentsMethod, + primary_key: Option, + file_store: UpdateFileStore, + contents: impl Iterator, + ) -> Result>> { + match self { + MockIndex::Real(index) => { + index.update_documents(method, primary_key, file_store, contents) + } + MockIndex::Mock(mocker) => unsafe { + mocker + .get("update_documents") + .call((method, primary_key, file_store, contents)) + }, + } + } + + pub fn update_settings(&self, settings: &Settings) -> Result<()> { + match self { + MockIndex::Real(index) => index.update_settings(settings), + MockIndex::Mock(m) => unsafe { m.get("update_settings").call(settings) }, + } + } + + pub fn update_primary_key(&self, primary_key: String) -> Result { + match self { + MockIndex::Real(index) => index.update_primary_key(primary_key), + MockIndex::Mock(m) => unsafe { m.get("update_primary_key").call(primary_key) }, + } + } + + pub fn delete_documents(&self, ids: &[String]) -> Result { + match self { + MockIndex::Real(index) => index.delete_documents(ids), + MockIndex::Mock(m) => unsafe { m.get("delete_documents").call(ids) }, + } + } + + pub fn clear_documents(&self) -> Result<()> { + match self { + MockIndex::Real(index) => index.clear_documents(), + MockIndex::Mock(m) => unsafe { m.get("clear_documents").call(()) }, + } + } + } + + #[test] + fn test_faux_index() { + let faux = Mocker::default(); + faux.when("snapshot") + .times(2) + .then(|_: &Path| -> Result<()> { Ok(()) }); + + let index = MockIndex::mock(faux); + + let path = PathBuf::from("hello"); + index.snapshot(&path).unwrap(); + index.snapshot(&path).unwrap(); + } + + #[test] + #[should_panic] + fn test_faux_unexisting_method_stub() { + let faux = Mocker::default(); + + let index = MockIndex::mock(faux); + + let path = PathBuf::from("hello"); + index.snapshot(&path).unwrap(); + index.snapshot(&path).unwrap(); + } + + #[test] + #[should_panic] + fn test_faux_panic() { + let faux = Mocker::default(); + faux.when("snapshot") + .times(2) + .then(|_: &Path| -> Result<()> { + panic!(); + }); + + let index = MockIndex::mock(faux); + + let path = PathBuf::from("hello"); + index.snapshot(&path).unwrap(); + index.snapshot(&path).unwrap(); + } +} diff --git a/index-scheduler/src/index/search.rs b/index-scheduler/src/index/search.rs new file mode 100644 index 000000000..57171d529 --- /dev/null +++ b/index-scheduler/src/index/search.rs @@ -0,0 +1,688 @@ +use std::cmp::min; +use std::collections::{BTreeMap, BTreeSet, HashSet}; +use std::str::FromStr; +use std::time::Instant; + +use either::Either; +use milli::tokenizer::TokenizerBuilder; +use milli::{ + AscDesc, FieldId, FieldsIdsMap, Filter, FormatOptions, MatchBounds, MatcherBuilder, SortError, + TermsMatchingStrategy, DEFAULT_VALUES_PER_FACET, +}; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; + +use crate::index::error::FacetError; + +use super::error::{IndexError, Result}; +use super::index::Index; + +pub type Document = serde_json::Map; +type MatchesPosition = BTreeMap>; + +pub const DEFAULT_SEARCH_LIMIT: fn() -> usize = || 20; +pub const DEFAULT_CROP_LENGTH: fn() -> usize = || 10; +pub const DEFAULT_CROP_MARKER: fn() -> String = || "…".to_string(); +pub const DEFAULT_HIGHLIGHT_PRE_TAG: fn() -> String = || "".to_string(); +pub const DEFAULT_HIGHLIGHT_POST_TAG: fn() -> String = || "".to_string(); + +/// The maximimum number of results that the engine +/// will be able to return in one search call. +pub const DEFAULT_PAGINATION_MAX_TOTAL_HITS: usize = 1000; + +#[derive(Deserialize, Debug, Clone, PartialEq, Eq)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +pub struct SearchQuery { + pub q: Option, + pub offset: Option, + #[serde(default = "DEFAULT_SEARCH_LIMIT")] + pub limit: usize, + pub attributes_to_retrieve: Option>, + pub attributes_to_crop: Option>, + #[serde(default = "DEFAULT_CROP_LENGTH")] + pub crop_length: usize, + pub attributes_to_highlight: Option>, + // Default to false + #[serde(default = "Default::default")] + pub show_matches_position: bool, + pub filter: Option, + pub sort: Option>, + pub facets: Option>, + #[serde(default = "DEFAULT_HIGHLIGHT_PRE_TAG")] + pub highlight_pre_tag: String, + #[serde(default = "DEFAULT_HIGHLIGHT_POST_TAG")] + pub highlight_post_tag: String, + #[serde(default = "DEFAULT_CROP_MARKER")] + pub crop_marker: String, + #[serde(default)] + pub matching_strategy: MatchingStrategy, +} + +#[derive(Deserialize, Debug, Clone, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub enum MatchingStrategy { + /// Remove query words from last to first + Last, + /// All query words are mandatory + All, +} + +impl Default for MatchingStrategy { + fn default() -> Self { + Self::Last + } +} + +impl From for TermsMatchingStrategy { + fn from(other: MatchingStrategy) -> Self { + match other { + MatchingStrategy::Last => Self::Last, + MatchingStrategy::All => Self::All, + } + } +} + +#[derive(Debug, Clone, Serialize, PartialEq)] +pub struct SearchHit { + #[serde(flatten)] + pub document: Document, + #[serde(rename = "_formatted", skip_serializing_if = "Document::is_empty")] + pub formatted: Document, + #[serde(rename = "_matchesPosition", skip_serializing_if = "Option::is_none")] + pub matches_position: Option, +} + +#[derive(Serialize, Debug, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct SearchResult { + pub hits: Vec, + pub estimated_total_hits: u64, + pub query: String, + pub limit: usize, + pub offset: usize, + pub processing_time_ms: u128, + #[serde(skip_serializing_if = "Option::is_none")] + pub facet_distribution: Option>>, +} + +impl Index { + pub fn perform_search(&self, query: SearchQuery) -> Result { + let before_search = Instant::now(); + let rtxn = self.read_txn()?; + + let mut search = self.search(&rtxn); + + if let Some(ref query) = query.q { + search.query(query); + } + + search.terms_matching_strategy(query.matching_strategy.into()); + + let max_total_hits = self + .pagination_max_total_hits(&rtxn)? + .unwrap_or(DEFAULT_PAGINATION_MAX_TOTAL_HITS); + + // Make sure that a user can't get more documents than the hard limit, + // we align that on the offset too. + let offset = min(query.offset.unwrap_or(0), max_total_hits); + let limit = min(query.limit, max_total_hits.saturating_sub(offset)); + + search.offset(offset); + search.limit(limit); + + if let Some(ref filter) = query.filter { + if let Some(facets) = parse_filter(filter)? { + search.filter(facets); + } + } + + if let Some(ref sort) = query.sort { + let sort = match sort.iter().map(|s| AscDesc::from_str(s)).collect() { + Ok(sorts) => sorts, + Err(asc_desc_error) => { + return Err(IndexError::Milli(SortError::from(asc_desc_error).into())) + } + }; + + search.sort_criteria(sort); + } + + let milli::SearchResult { + documents_ids, + matching_words, + candidates, + .. + } = search.execute()?; + + let fields_ids_map = self.fields_ids_map(&rtxn).unwrap(); + + let displayed_ids = self + .displayed_fields_ids(&rtxn)? + .map(|fields| fields.into_iter().collect::>()) + .unwrap_or_else(|| fields_ids_map.iter().map(|(id, _)| id).collect()); + + let fids = |attrs: &BTreeSet| { + let mut ids = BTreeSet::new(); + for attr in attrs { + if attr == "*" { + ids = displayed_ids.clone(); + break; + } + + if let Some(id) = fields_ids_map.id(attr) { + ids.insert(id); + } + } + ids + }; + + // The attributes to retrieve are the ones explicitly marked as to retrieve (all by default), + // but these attributes must be also be present + // - in the fields_ids_map + // - in the the displayed attributes + let to_retrieve_ids: BTreeSet<_> = query + .attributes_to_retrieve + .as_ref() + .map(fids) + .unwrap_or_else(|| displayed_ids.clone()) + .intersection(&displayed_ids) + .cloned() + .collect(); + + let attr_to_highlight = query.attributes_to_highlight.unwrap_or_default(); + + let attr_to_crop = query.attributes_to_crop.unwrap_or_default(); + + // Attributes in `formatted_options` correspond to the attributes that will be in `_formatted` + // These attributes are: + // - the attributes asked to be highlighted or cropped (with `attributesToCrop` or `attributesToHighlight`) + // - the attributes asked to be retrieved: these attributes will not be highlighted/cropped + // But these attributes must be also present in displayed attributes + let formatted_options = compute_formatted_options( + &attr_to_highlight, + &attr_to_crop, + query.crop_length, + &to_retrieve_ids, + &fields_ids_map, + &displayed_ids, + ); + + let tokenizer = TokenizerBuilder::default().build(); + + let mut formatter_builder = MatcherBuilder::new(matching_words, tokenizer); + formatter_builder.crop_marker(query.crop_marker); + formatter_builder.highlight_prefix(query.highlight_pre_tag); + formatter_builder.highlight_suffix(query.highlight_post_tag); + + let mut documents = Vec::new(); + + let documents_iter = self.documents(&rtxn, documents_ids)?; + + for (_id, obkv) in documents_iter { + // First generate a document with all the displayed fields + let displayed_document = make_document(&displayed_ids, &fields_ids_map, obkv)?; + + // select the attributes to retrieve + let attributes_to_retrieve = to_retrieve_ids + .iter() + .map(|&fid| fields_ids_map.name(fid).expect("Missing field name")); + let mut document = + permissive_json_pointer::select_values(&displayed_document, attributes_to_retrieve); + + let (matches_position, formatted) = format_fields( + &displayed_document, + &fields_ids_map, + &formatter_builder, + &formatted_options, + query.show_matches_position, + &displayed_ids, + )?; + + if let Some(sort) = query.sort.as_ref() { + insert_geo_distance(sort, &mut document); + } + + let hit = SearchHit { + document, + formatted, + matches_position, + }; + documents.push(hit); + } + + let estimated_total_hits = candidates.len(); + + let facet_distribution = match query.facets { + Some(ref fields) => { + let mut facet_distribution = self.facets_distribution(&rtxn); + + let max_values_by_facet = self + .max_values_per_facet(&rtxn)? + .unwrap_or(DEFAULT_VALUES_PER_FACET); + facet_distribution.max_values_per_facet(max_values_by_facet); + + if fields.iter().all(|f| f != "*") { + facet_distribution.facets(fields); + } + let distribution = facet_distribution.candidates(candidates).execute()?; + + Some(distribution) + } + None => None, + }; + + let result = SearchResult { + hits: documents, + estimated_total_hits, + query: query.q.clone().unwrap_or_default(), + limit: query.limit, + offset: query.offset.unwrap_or_default(), + processing_time_ms: before_search.elapsed().as_millis(), + facet_distribution, + }; + Ok(result) + } +} + +fn insert_geo_distance(sorts: &[String], document: &mut Document) { + lazy_static::lazy_static! { + static ref GEO_REGEX: Regex = + Regex::new(r"_geoPoint\(\s*([[:digit:].\-]+)\s*,\s*([[:digit:].\-]+)\s*\)").unwrap(); + }; + if let Some(capture_group) = sorts.iter().find_map(|sort| GEO_REGEX.captures(sort)) { + // TODO: TAMO: milli encountered an internal error, what do we want to do? + let base = [ + capture_group[1].parse().unwrap(), + capture_group[2].parse().unwrap(), + ]; + let geo_point = &document.get("_geo").unwrap_or(&json!(null)); + if let Some((lat, lng)) = geo_point["lat"].as_f64().zip(geo_point["lng"].as_f64()) { + let distance = milli::distance_between_two_points(&base, &[lat, lng]); + document.insert("_geoDistance".to_string(), json!(distance.round() as usize)); + } + } +} + +fn compute_formatted_options( + attr_to_highlight: &HashSet, + attr_to_crop: &[String], + query_crop_length: usize, + to_retrieve_ids: &BTreeSet, + fields_ids_map: &FieldsIdsMap, + displayed_ids: &BTreeSet, +) -> BTreeMap { + let mut formatted_options = BTreeMap::new(); + + add_highlight_to_formatted_options( + &mut formatted_options, + attr_to_highlight, + fields_ids_map, + displayed_ids, + ); + + add_crop_to_formatted_options( + &mut formatted_options, + attr_to_crop, + query_crop_length, + fields_ids_map, + displayed_ids, + ); + + // Should not return `_formatted` if no valid attributes to highlight/crop + if !formatted_options.is_empty() { + add_non_formatted_ids_to_formatted_options(&mut formatted_options, to_retrieve_ids); + } + + formatted_options +} + +fn add_highlight_to_formatted_options( + formatted_options: &mut BTreeMap, + attr_to_highlight: &HashSet, + fields_ids_map: &FieldsIdsMap, + displayed_ids: &BTreeSet, +) { + for attr in attr_to_highlight { + let new_format = FormatOptions { + highlight: true, + crop: None, + }; + + if attr == "*" { + for id in displayed_ids { + formatted_options.insert(*id, new_format); + } + break; + } + + if let Some(id) = fields_ids_map.id(attr) { + if displayed_ids.contains(&id) { + formatted_options.insert(id, new_format); + } + } + } +} + +fn add_crop_to_formatted_options( + formatted_options: &mut BTreeMap, + attr_to_crop: &[String], + crop_length: usize, + fields_ids_map: &FieldsIdsMap, + displayed_ids: &BTreeSet, +) { + for attr in attr_to_crop { + let mut split = attr.rsplitn(2, ':'); + let (attr_name, attr_len) = match split.next().zip(split.next()) { + Some((len, name)) => { + let crop_len = len.parse::().unwrap_or(crop_length); + (name, crop_len) + } + None => (attr.as_str(), crop_length), + }; + + if attr_name == "*" { + for id in displayed_ids { + formatted_options + .entry(*id) + .and_modify(|f| f.crop = Some(attr_len)) + .or_insert(FormatOptions { + highlight: false, + crop: Some(attr_len), + }); + } + } + + if let Some(id) = fields_ids_map.id(attr_name) { + if displayed_ids.contains(&id) { + formatted_options + .entry(id) + .and_modify(|f| f.crop = Some(attr_len)) + .or_insert(FormatOptions { + highlight: false, + crop: Some(attr_len), + }); + } + } + } +} + +fn add_non_formatted_ids_to_formatted_options( + formatted_options: &mut BTreeMap, + to_retrieve_ids: &BTreeSet, +) { + for id in to_retrieve_ids { + formatted_options.entry(*id).or_insert(FormatOptions { + highlight: false, + crop: None, + }); + } +} + +fn make_document( + displayed_attributes: &BTreeSet, + field_ids_map: &FieldsIdsMap, + obkv: obkv::KvReaderU16, +) -> Result { + let mut document = serde_json::Map::new(); + + // recreate the original json + for (key, value) in obkv.iter() { + let value = serde_json::from_slice(value)?; + let key = field_ids_map + .name(key) + .expect("Missing field name") + .to_string(); + + document.insert(key, value); + } + + // select the attributes to retrieve + let displayed_attributes = displayed_attributes + .iter() + .map(|&fid| field_ids_map.name(fid).expect("Missing field name")); + + let document = permissive_json_pointer::select_values(&document, displayed_attributes); + Ok(document) +} + +fn format_fields<'a, A: AsRef<[u8]>>( + document: &Document, + field_ids_map: &FieldsIdsMap, + builder: &MatcherBuilder<'a, A>, + formatted_options: &BTreeMap, + compute_matches: bool, + displayable_ids: &BTreeSet, +) -> Result<(Option, Document)> { + let mut matches_position = compute_matches.then(BTreeMap::new); + let mut document = document.clone(); + + // select the attributes to retrieve + let displayable_names = displayable_ids + .iter() + .map(|&fid| field_ids_map.name(fid).expect("Missing field name")); + permissive_json_pointer::map_leaf_values(&mut document, displayable_names, |key, value| { + // To get the formatting option of each key we need to see all the rules that applies + // to the value and merge them together. eg. If a user said he wanted to highlight `doggo` + // and crop `doggo.name`. `doggo.name` needs to be highlighted + cropped while `doggo.age` is only + // highlighted. + let format = formatted_options + .iter() + .filter(|(field, _option)| { + let name = field_ids_map.name(**field).unwrap(); + milli::is_faceted_by(name, key) || milli::is_faceted_by(key, name) + }) + .map(|(_, option)| *option) + .reduce(|acc, option| acc.merge(option)); + let mut infos = Vec::new(); + + *value = format_value( + std::mem::take(value), + builder, + format, + &mut infos, + compute_matches, + ); + + if let Some(matches) = matches_position.as_mut() { + if !infos.is_empty() { + matches.insert(key.to_owned(), infos); + } + } + }); + + let selectors = formatted_options + .keys() + // This unwrap must be safe since we got the ids from the fields_ids_map just + // before. + .map(|&fid| field_ids_map.name(fid).unwrap()); + let document = permissive_json_pointer::select_values(&document, selectors); + + Ok((matches_position, document)) +} + +fn format_value<'a, A: AsRef<[u8]>>( + value: Value, + builder: &MatcherBuilder<'a, A>, + format_options: Option, + infos: &mut Vec, + compute_matches: bool, +) -> Value { + match value { + Value::String(old_string) => { + let mut matcher = builder.build(&old_string); + if compute_matches { + let matches = matcher.matches(); + infos.extend_from_slice(&matches[..]); + } + + match format_options { + Some(format_options) => { + let value = matcher.format(format_options); + Value::String(value.into_owned()) + } + None => Value::String(old_string), + } + } + Value::Array(values) => Value::Array( + values + .into_iter() + .map(|v| { + format_value( + v, + builder, + format_options.map(|format_options| FormatOptions { + highlight: format_options.highlight, + crop: None, + }), + infos, + compute_matches, + ) + }) + .collect(), + ), + Value::Object(object) => Value::Object( + object + .into_iter() + .map(|(k, v)| { + ( + k, + format_value( + v, + builder, + format_options.map(|format_options| FormatOptions { + highlight: format_options.highlight, + crop: None, + }), + infos, + compute_matches, + ), + ) + }) + .collect(), + ), + Value::Number(number) => { + let s = number.to_string(); + + let mut matcher = builder.build(&s); + if compute_matches { + let matches = matcher.matches(); + infos.extend_from_slice(&matches[..]); + } + + match format_options { + Some(format_options) => { + let value = matcher.format(format_options); + Value::String(value.into_owned()) + } + None => Value::Number(number), + } + } + value => value, + } +} + +fn parse_filter(facets: &Value) -> Result> { + match facets { + Value::String(expr) => { + let condition = Filter::from_str(expr)?; + Ok(condition) + } + Value::Array(arr) => parse_filter_array(arr), + v => Err(FacetError::InvalidExpression(&["Array"], v.clone()).into()), + } +} + +fn parse_filter_array(arr: &[Value]) -> Result> { + let mut ands = Vec::new(); + for value in arr { + match value { + Value::String(s) => ands.push(Either::Right(s.as_str())), + Value::Array(arr) => { + let mut ors = Vec::new(); + for value in arr { + match value { + Value::String(s) => ors.push(s.as_str()), + v => { + return Err(FacetError::InvalidExpression(&["String"], v.clone()).into()) + } + } + } + ands.push(Either::Left(ors)); + } + v => { + return Err( + FacetError::InvalidExpression(&["String", "[String]"], v.clone()).into(), + ) + } + } + } + + Ok(Filter::from_array(ands)?) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_insert_geo_distance() { + let value: Document = serde_json::from_str( + r#"{ + "_geo": { + "lat": 50.629973371633746, + "lng": 3.0569447399419567 + }, + "city": "Lille", + "id": "1" + }"#, + ) + .unwrap(); + + let sorters = &["_geoPoint(50.629973371633746,3.0569447399419567):desc".to_string()]; + let mut document = value.clone(); + insert_geo_distance(sorters, &mut document); + assert_eq!(document.get("_geoDistance"), Some(&json!(0))); + + let sorters = &["_geoPoint(50.629973371633746, 3.0569447399419567):asc".to_string()]; + let mut document = value.clone(); + insert_geo_distance(sorters, &mut document); + assert_eq!(document.get("_geoDistance"), Some(&json!(0))); + + let sorters = + &["_geoPoint( 50.629973371633746 , 3.0569447399419567 ):desc".to_string()]; + let mut document = value.clone(); + insert_geo_distance(sorters, &mut document); + assert_eq!(document.get("_geoDistance"), Some(&json!(0))); + + let sorters = &[ + "prix:asc", + "villeneuve:desc", + "_geoPoint(50.629973371633746, 3.0569447399419567):asc", + "ubu:asc", + ] + .map(|s| s.to_string()); + let mut document = value.clone(); + insert_geo_distance(sorters, &mut document); + assert_eq!(document.get("_geoDistance"), Some(&json!(0))); + + // only the first geoPoint is used to compute the distance + let sorters = &[ + "chien:desc", + "_geoPoint(50.629973371633746, 3.0569447399419567):asc", + "pangolin:desc", + "_geoPoint(100.0, -80.0):asc", + "chat:asc", + ] + .map(|s| s.to_string()); + let mut document = value.clone(); + insert_geo_distance(sorters, &mut document); + assert_eq!(document.get("_geoDistance"), Some(&json!(0))); + + // there was no _geoPoint so nothing is inserted in the document + let sorters = &["chien:asc".to_string()]; + let mut document = value; + insert_geo_distance(sorters, &mut document); + assert_eq!(document.get("_geoDistance"), None); + } +} diff --git a/index-scheduler/src/index/updates.rs b/index-scheduler/src/index/updates.rs new file mode 100644 index 000000000..1361cb919 --- /dev/null +++ b/index-scheduler/src/index/updates.rs @@ -0,0 +1,562 @@ +use std::collections::{BTreeMap, BTreeSet}; +use std::marker::PhantomData; +use std::num::NonZeroUsize; + +use log::{debug, info, trace}; +use milli::documents::DocumentsBatchReader; +use milli::update::{ + DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod, + Setting, +}; +use serde::{Deserialize, Serialize, Serializer}; +use uuid::Uuid; + +use super::error::{IndexError, Result}; +use super::index::{Index, IndexMeta}; +use crate::update_file_store::UpdateFileStore; + +fn serialize_with_wildcard( + field: &Setting>, + s: S, +) -> std::result::Result +where + S: Serializer, +{ + let wildcard = vec!["*".to_string()]; + match field { + Setting::Set(value) => Some(value), + Setting::Reset => Some(&wildcard), + Setting::NotSet => None, + } + .serialize(s) +} + +#[derive(Clone, Default, Debug, Serialize, PartialEq, Eq)] +pub struct Checked; + +#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct Unchecked; + +#[cfg_attr(test, derive(proptest_derive::Arbitrary))] +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] +#[serde(rename_all = "camelCase")] +pub struct MinWordSizeTyposSetting { + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + pub one_typo: Setting, + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + pub two_typos: Setting, +} + +#[cfg_attr(test, derive(proptest_derive::Arbitrary))] +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] +#[serde(rename_all = "camelCase")] +pub struct TypoSettings { + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + pub enabled: Setting, + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + pub min_word_size_for_typos: Setting, + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + pub disable_on_words: Setting>, + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + pub disable_on_attributes: Setting>, +} + +#[cfg_attr(test, derive(proptest_derive::Arbitrary))] +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] +#[serde(rename_all = "camelCase")] +pub struct FacetingSettings { + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + pub max_values_per_facet: Setting, +} + +#[cfg_attr(test, derive(proptest_derive::Arbitrary))] +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] +#[serde(rename_all = "camelCase")] +pub struct PaginationSettings { + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + pub max_total_hits: Setting, +} + +/// Holds all the settings for an index. `T` can either be `Checked` if they represents settings +/// whose validity is guaranteed, or `Unchecked` if they need to be validated. In the later case, a +/// call to `check` will return a `Settings` from a `Settings`. +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] +#[serde(rename_all = "camelCase")] +#[serde(bound(serialize = "T: Serialize", deserialize = "T: Deserialize<'static>"))] +#[cfg_attr(test, derive(proptest_derive::Arbitrary))] +pub struct Settings { + #[serde( + default, + serialize_with = "serialize_with_wildcard", + skip_serializing_if = "Setting::is_not_set" + )] + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + pub displayed_attributes: Setting>, + + #[serde( + default, + serialize_with = "serialize_with_wildcard", + skip_serializing_if = "Setting::is_not_set" + )] + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + pub searchable_attributes: Setting>, + + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + pub filterable_attributes: Setting>, + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + pub sortable_attributes: Setting>, + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + pub ranking_rules: Setting>, + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + pub stop_words: Setting>, + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + pub synonyms: Setting>>, + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + pub distinct_attribute: Setting, + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + pub typo_tolerance: Setting, + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + pub faceting: Setting, + #[serde(default, skip_serializing_if = "Setting::is_not_set")] + #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] + pub pagination: Setting, + + #[serde(skip)] + pub _kind: PhantomData, +} + +impl Settings { + pub fn cleared() -> Settings { + Settings { + displayed_attributes: Setting::Reset, + searchable_attributes: Setting::Reset, + filterable_attributes: Setting::Reset, + sortable_attributes: Setting::Reset, + ranking_rules: Setting::Reset, + stop_words: Setting::Reset, + synonyms: Setting::Reset, + distinct_attribute: Setting::Reset, + typo_tolerance: Setting::Reset, + faceting: Setting::Reset, + pagination: Setting::Reset, + _kind: PhantomData, + } + } + + pub fn into_unchecked(self) -> Settings { + let Self { + displayed_attributes, + searchable_attributes, + filterable_attributes, + sortable_attributes, + ranking_rules, + stop_words, + synonyms, + distinct_attribute, + typo_tolerance, + faceting, + pagination, + .. + } = self; + + Settings { + displayed_attributes, + searchable_attributes, + filterable_attributes, + sortable_attributes, + ranking_rules, + stop_words, + synonyms, + distinct_attribute, + typo_tolerance, + faceting, + pagination, + _kind: PhantomData, + } + } +} + +impl Settings { + pub fn check(self) -> Settings { + let displayed_attributes = match self.displayed_attributes { + Setting::Set(fields) => { + if fields.iter().any(|f| f == "*") { + Setting::Reset + } else { + Setting::Set(fields) + } + } + otherwise => otherwise, + }; + + let searchable_attributes = match self.searchable_attributes { + Setting::Set(fields) => { + if fields.iter().any(|f| f == "*") { + Setting::Reset + } else { + Setting::Set(fields) + } + } + otherwise => otherwise, + }; + + Settings { + displayed_attributes, + searchable_attributes, + filterable_attributes: self.filterable_attributes, + sortable_attributes: self.sortable_attributes, + ranking_rules: self.ranking_rules, + stop_words: self.stop_words, + synonyms: self.synonyms, + distinct_attribute: self.distinct_attribute, + typo_tolerance: self.typo_tolerance, + faceting: self.faceting, + pagination: self.pagination, + _kind: PhantomData, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +#[serde(rename_all = "camelCase")] +pub struct Facets { + pub level_group_size: Option, + pub min_level_size: Option, +} + +impl Index { + fn update_primary_key_txn<'a, 'b>( + &'a self, + txn: &mut milli::heed::RwTxn<'a, 'b>, + primary_key: String, + ) -> Result { + let mut builder = milli::update::Settings::new(txn, self, self.indexer_config.as_ref()); + builder.set_primary_key(primary_key); + builder.execute(|_| ())?; + let meta = IndexMeta::new_txn(self, txn)?; + + Ok(meta) + } + + pub fn update_primary_key(&self, primary_key: String) -> Result { + let mut txn = self.write_txn()?; + let res = self.update_primary_key_txn(&mut txn, primary_key)?; + txn.commit()?; + + Ok(res) + } + + /// Deletes `ids` from the index, and returns how many documents were deleted. + pub fn delete_documents(&self, ids: &[String]) -> Result { + let mut txn = self.write_txn()?; + let mut builder = milli::update::DeleteDocuments::new(&mut txn, self)?; + + // We ignore unexisting document ids + ids.iter().for_each(|id| { + builder.delete_external_id(id); + }); + + let deleted = builder.execute()?; + + txn.commit()?; + + Ok(deleted) + } + + pub fn clear_documents(&self) -> Result<()> { + let mut txn = self.write_txn()?; + milli::update::ClearDocuments::new(&mut txn, self).execute()?; + txn.commit()?; + + Ok(()) + } + + pub fn update_documents( + &self, + method: IndexDocumentsMethod, + primary_key: Option, + file_store: UpdateFileStore, + contents: impl IntoIterator, + ) -> Result>> { + trace!("performing document addition"); + let mut txn = self.write_txn()?; + + if let Some(primary_key) = primary_key { + if self.primary_key(&txn)?.is_none() { + self.update_primary_key_txn(&mut txn, primary_key)?; + } + } + + let config = IndexDocumentsConfig { + update_method: method, + ..Default::default() + }; + + let indexing_callback = |indexing_step| debug!("update: {:?}", indexing_step); + let mut builder = milli::update::IndexDocuments::new( + &mut txn, + self, + self.indexer_config.as_ref(), + config, + indexing_callback, + )?; + + let mut results = Vec::new(); + for content_uuid in contents.into_iter() { + let content_file = file_store.get_update(content_uuid)?; + let reader = DocumentsBatchReader::from_reader(content_file)?; + let (new_builder, user_result) = builder.add_documents(reader)?; + builder = new_builder; + + let user_result = match user_result { + Ok(count) => { + let addition = DocumentAdditionResult { + indexed_documents: count, + number_of_documents: count, + }; + info!("document addition done: {:?}", addition); + Ok(addition) + } + Err(e) => Err(IndexError::from(e)), + }; + + results.push(user_result); + } + + if results.iter().any(Result::is_ok) { + let _addition = builder.execute()?; + txn.commit()?; + } + + Ok(results) + } + + pub fn update_settings(&self, settings: &Settings) -> Result<()> { + // We must use the write transaction of the update here. + let mut txn = self.write_txn()?; + let mut builder = + milli::update::Settings::new(&mut txn, self, self.indexer_config.as_ref()); + + apply_settings_to_builder(settings, &mut builder); + + builder.execute(|indexing_step| debug!("update: {:?}", indexing_step))?; + + txn.commit()?; + + Ok(()) + } +} + +pub fn apply_settings_to_builder( + settings: &Settings, + builder: &mut milli::update::Settings, +) { + match settings.searchable_attributes { + Setting::Set(ref names) => builder.set_searchable_fields(names.clone()), + Setting::Reset => builder.reset_searchable_fields(), + Setting::NotSet => (), + } + + match settings.displayed_attributes { + Setting::Set(ref names) => builder.set_displayed_fields(names.clone()), + Setting::Reset => builder.reset_displayed_fields(), + Setting::NotSet => (), + } + + match settings.filterable_attributes { + Setting::Set(ref facets) => { + builder.set_filterable_fields(facets.clone().into_iter().collect()) + } + Setting::Reset => builder.reset_filterable_fields(), + Setting::NotSet => (), + } + + match settings.sortable_attributes { + Setting::Set(ref fields) => builder.set_sortable_fields(fields.iter().cloned().collect()), + Setting::Reset => builder.reset_sortable_fields(), + Setting::NotSet => (), + } + + match settings.ranking_rules { + Setting::Set(ref criteria) => builder.set_criteria(criteria.clone()), + Setting::Reset => builder.reset_criteria(), + Setting::NotSet => (), + } + + match settings.stop_words { + Setting::Set(ref stop_words) => builder.set_stop_words(stop_words.clone()), + Setting::Reset => builder.reset_stop_words(), + Setting::NotSet => (), + } + + match settings.synonyms { + Setting::Set(ref synonyms) => builder.set_synonyms(synonyms.clone().into_iter().collect()), + Setting::Reset => builder.reset_synonyms(), + Setting::NotSet => (), + } + + match settings.distinct_attribute { + Setting::Set(ref attr) => builder.set_distinct_field(attr.clone()), + Setting::Reset => builder.reset_distinct_field(), + Setting::NotSet => (), + } + + match settings.typo_tolerance { + Setting::Set(ref value) => { + match value.enabled { + Setting::Set(val) => builder.set_autorize_typos(val), + Setting::Reset => builder.reset_authorize_typos(), + Setting::NotSet => (), + } + + match value.min_word_size_for_typos { + Setting::Set(ref setting) => { + match setting.one_typo { + Setting::Set(val) => builder.set_min_word_len_one_typo(val), + Setting::Reset => builder.reset_min_word_len_one_typo(), + Setting::NotSet => (), + } + match setting.two_typos { + Setting::Set(val) => builder.set_min_word_len_two_typos(val), + Setting::Reset => builder.reset_min_word_len_two_typos(), + Setting::NotSet => (), + } + } + Setting::Reset => { + builder.reset_min_word_len_one_typo(); + builder.reset_min_word_len_two_typos(); + } + Setting::NotSet => (), + } + + match value.disable_on_words { + Setting::Set(ref words) => { + builder.set_exact_words(words.clone()); + } + Setting::Reset => builder.reset_exact_words(), + Setting::NotSet => (), + } + + match value.disable_on_attributes { + Setting::Set(ref words) => { + builder.set_exact_attributes(words.iter().cloned().collect()) + } + Setting::Reset => builder.reset_exact_attributes(), + Setting::NotSet => (), + } + } + Setting::Reset => { + // all typo settings need to be reset here. + builder.reset_authorize_typos(); + builder.reset_min_word_len_one_typo(); + builder.reset_min_word_len_two_typos(); + builder.reset_exact_words(); + builder.reset_exact_attributes(); + } + Setting::NotSet => (), + } + + match settings.faceting { + Setting::Set(ref value) => match value.max_values_per_facet { + Setting::Set(val) => builder.set_max_values_per_facet(val), + Setting::Reset => builder.reset_max_values_per_facet(), + Setting::NotSet => (), + }, + Setting::Reset => builder.reset_max_values_per_facet(), + Setting::NotSet => (), + } + + match settings.pagination { + Setting::Set(ref value) => match value.max_total_hits { + Setting::Set(val) => builder.set_pagination_max_total_hits(val), + Setting::Reset => builder.reset_pagination_max_total_hits(), + Setting::NotSet => (), + }, + Setting::Reset => builder.reset_pagination_max_total_hits(), + Setting::NotSet => (), + } +} + +#[cfg(test)] +pub(crate) mod test { + use proptest::prelude::*; + + use super::*; + + pub(super) fn setting_strategy() -> impl Strategy> { + prop_oneof![ + Just(Setting::NotSet), + Just(Setting::Reset), + any::().prop_map(Setting::Set) + ] + } + + #[test] + fn test_setting_check() { + // test no changes + let settings = Settings { + displayed_attributes: Setting::Set(vec![String::from("hello")]), + searchable_attributes: Setting::Set(vec![String::from("hello")]), + filterable_attributes: Setting::NotSet, + sortable_attributes: Setting::NotSet, + ranking_rules: Setting::NotSet, + stop_words: Setting::NotSet, + synonyms: Setting::NotSet, + distinct_attribute: Setting::NotSet, + typo_tolerance: Setting::NotSet, + faceting: Setting::NotSet, + pagination: Setting::NotSet, + _kind: PhantomData::, + }; + + let checked = settings.clone().check(); + assert_eq!(settings.displayed_attributes, checked.displayed_attributes); + assert_eq!( + settings.searchable_attributes, + checked.searchable_attributes + ); + + // test wildcard + // test no changes + let settings = Settings { + displayed_attributes: Setting::Set(vec![String::from("*")]), + searchable_attributes: Setting::Set(vec![String::from("hello"), String::from("*")]), + filterable_attributes: Setting::NotSet, + sortable_attributes: Setting::NotSet, + ranking_rules: Setting::NotSet, + stop_words: Setting::NotSet, + synonyms: Setting::NotSet, + distinct_attribute: Setting::NotSet, + typo_tolerance: Setting::NotSet, + faceting: Setting::NotSet, + pagination: Setting::NotSet, + _kind: PhantomData::, + }; + + let checked = settings.check(); + assert_eq!(checked.displayed_attributes, Setting::Reset); + assert_eq!(checked.searchable_attributes, Setting::Reset); + } +} diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index f650dd448..61ddcb882 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1,20 +1,24 @@ mod batch; pub mod error; +pub mod index; pub mod task; +mod update_file_store; mod utils; +use batch::Batch; pub use error::Error; +use index::Index; use milli::heed::types::{DecodeIgnore, OwnedType, SerdeBincode, Str}; pub use task::Task; -use task::{Kind, Status}; +use task::{Kind, KindWithContent, Status}; use std::collections::hash_map::Entry; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::{collections::HashMap, sync::RwLock}; use milli::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn}; -use milli::{Index, RoaringBitmapCodec, BEU32}; +use milli::{RoaringBitmapCodec, BEU32}; use roaring::RoaringBitmap; use serde::Deserialize; @@ -44,7 +48,7 @@ pub struct Query { pub struct IndexScheduler { // Keep track of the opened indexes and is used // mainly by the index resolver. - index_map: Arc>>, + index_map: Arc>>, /// The list of tasks currently processing. processing_tasks: Arc>, @@ -60,8 +64,8 @@ pub struct IndexScheduler { // All the tasks ids grouped by their kind. kind: Database, RoaringBitmapCodec>, - // Map an index name with an indexuuid. - index_name_mapper: Database, + // Tell you if an index is currently available. + available_index: Database>, // Store the tasks associated to an index. index_tasks: Database, @@ -75,12 +79,13 @@ impl IndexScheduler { /// `IndexNotFound` error. pub fn index(&self, name: &str) -> Result { let rtxn = self.env.read_txn()?; - let uuid = self - .index_name_mapper + + self.available_index .get(&rtxn, name)? - .ok_or(Error::IndexNotFound)?; + .ok_or(Error::IndexNotFound(name.to_string()))?; + // we clone here to drop the lock before entering the match - let index = self.index_map.read().unwrap().get(&*uuid).cloned(); + let index = self.index_map.read().unwrap().get(name).cloned(); let index = match index { Some(index) => index, // since we're lazy, it's possible that the index doesn't exist yet. @@ -93,10 +98,15 @@ impl IndexScheduler { // if it's not already there. // Since there is a good chance it's not already there we can use // the entry method. - match index_map.entry(uuid.to_string()) { + match index_map.entry(name.to_string()) { Entry::Vacant(entry) => { - // TODO: TAMO: get the envopenoptions from somewhere - let index = milli::Index::new(EnvOpenOptions::new(), uuid)?; + // TODO: TAMO: get the args from somewhere. + let index = Index::open( + name.to_string(), + name.to_string(), + 100_000_000, + Arc::default(), + )?; entry.insert(index.clone()); index } @@ -191,6 +201,96 @@ impl IndexScheduler { Ok(()) } + /// This worker function must be run in a different thread and must be run only once. + fn run(&self) { + loop { + // TODO: TAMO: remove this horrible spinlock in favor of a sleep / channel / we’ll see + while !self.wake_up.swap(false, Ordering::Relaxed) {} + + let mut wtxn = match self.env.write_txn() { + Ok(wtxn) => wtxn, + Err(e) => { + log::error!("{}", e); + continue; + } + }; + let batch = match self.get_next_batch(&wtxn) { + Ok(batch) => batch, + Err(e) => { + log::error!("{}", e); + continue; + } + }; + + let res = self.process_batch(&mut wtxn, &mut batch); + + // TODO: TAMO: do this later + // self.handle_batch_result(res); + } + } + + fn process_batch(&self, wtxn: &mut RwTxn, batch: &mut Batch) -> Result<()> { + match batch { + Batch::One(task) => match task.kind { + KindWithContent::ClearAllDocuments { index_name } => { + self.index(&index_name)?.clear_documents()?; + } + KindWithContent::RenameIndex { + index_name, + new_name, + } => { + if self.available_index.get(wtxn, &new_name)?.unwrap_or(false) { + return Err(Error::IndexAlreadyExists); + } + todo!("wait for @guigui insight"); + } + KindWithContent::CreateIndex { + index_name, + primary_key, + } => { + if self + .available_index + .get(wtxn, &index_name)? + .unwrap_or(false) + { + return Err(Error::IndexAlreadyExists(index_name.to_string())); + } + + self.available_index.put(wtxn, &index_name, &true); + // let index = + todo!("tamo: once I get index.rs to works"); + } + KindWithContent::DeleteIndex { index_name } => { + self.index_map.write(); + if !self.available_index.delete(wtxn, &index_name)? { + return Err(Error::IndexNotFound(index_name.to_string())); + } + todo!("tamo: once I get index.rs to works"); + } + KindWithContent::SwapIndex { lhs, rhs } => { + if !self.available_index.get(wtxn, &lhs)?.unwrap_or(false) { + return Err(Error::IndexNotFound(lhs.to_string())); + } + if !self.available_index.get(wtxn, &rhs)?.unwrap_or(false) { + return Err(Error::IndexNotFound(rhs.to_string())); + } + + let index_map = self.index_map.write()?; + + // index_map.remove. + } + _ => unreachable!(), + }, + Batch::Cancel(_) => todo!(), + Batch::Snapshot(_) => todo!(), + Batch::Dump(_) => todo!(), + Batch::Contiguous { tasks, kind } => todo!(), + Batch::Empty => todo!(), + } + + Ok(()) + } + /// Notify the scheduler there is or may be work to do. pub fn notify(&self) { self.wake_up diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index 7205bf849..de14afcf6 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -5,7 +5,7 @@ use time::OffsetDateTime; use crate::TaskId; -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum Status { Enqueued, @@ -17,6 +17,8 @@ pub enum Status { #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Task { + pub uid: TaskId, + #[serde(with = "time::serde::rfc3339::option")] pub enqueued_at: Option, #[serde(with = "time::serde::rfc3339::option")] @@ -45,7 +47,7 @@ impl Task { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum KindWithContent { DumpExport { @@ -173,7 +175,7 @@ impl KindWithContent { } } -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum Kind { CancelTask, diff --git a/index-scheduler/src/update_file_store.rs b/index-scheduler/src/update_file_store.rs new file mode 100644 index 000000000..cb4eadf4d --- /dev/null +++ b/index-scheduler/src/update_file_store.rs @@ -0,0 +1,258 @@ +use std::fs::{create_dir_all, File}; +use std::io::{self, BufReader, BufWriter, Write}; +use std::ops::{Deref, DerefMut}; +use std::path::{Path, PathBuf}; + +use milli::documents::DocumentsBatchReader; +use serde_json::Map; +use tempfile::{NamedTempFile, PersistError}; +use uuid::Uuid; + +#[cfg(not(test))] +pub use store::UpdateFileStore; +#[cfg(test)] +pub use test::MockUpdateFileStore as UpdateFileStore; + +const UPDATE_FILES_PATH: &str = "updates/updates_files"; + +use crate::document_formats::read_ndjson; + +pub struct UpdateFile { + path: PathBuf, + file: NamedTempFile, +} + +#[derive(Debug, thiserror::Error)] +#[error("Error while persisting update to disk: {0}")] +pub struct UpdateFileStoreError(Box); + +pub type Result = std::result::Result; + +macro_rules! into_update_store_error { + ($($other:path),*) => { + $( + impl From<$other> for UpdateFileStoreError { + fn from(other: $other) -> Self { + Self(Box::new(other)) + } + } + )* + }; +} + +into_update_store_error!( + PersistError, + io::Error, + serde_json::Error, + milli::documents::Error, + milli::documents::DocumentsBatchCursorError +); + +impl UpdateFile { + pub fn persist(self) -> Result<()> { + self.file.persist(&self.path)?; + Ok(()) + } +} + +impl Deref for UpdateFile { + type Target = NamedTempFile; + + fn deref(&self) -> &Self::Target { + &self.file + } +} + +impl DerefMut for UpdateFile { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.file + } +} + +mod store { + use super::*; + + #[derive(Clone, Debug)] + pub struct UpdateFileStore { + path: PathBuf, + } + + impl UpdateFileStore { + pub fn load_dump(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { + let src_update_files_path = src.as_ref().join(UPDATE_FILES_PATH); + let dst_update_files_path = dst.as_ref().join(UPDATE_FILES_PATH); + + // No update files to load + if !src_update_files_path.exists() { + return Ok(()); + } + + create_dir_all(&dst_update_files_path)?; + + let entries = std::fs::read_dir(src_update_files_path)?; + + for entry in entries { + let entry = entry?; + let update_file = BufReader::new(File::open(entry.path())?); + let file_uuid = entry.file_name(); + let file_uuid = file_uuid + .to_str() + .ok_or_else(|| anyhow::anyhow!("invalid update file name"))?; + let dst_path = dst_update_files_path.join(file_uuid); + let dst_file = BufWriter::new(File::create(dst_path)?); + read_ndjson(update_file, dst_file)?; + } + + Ok(()) + } + + pub fn new(path: impl AsRef) -> Result { + let path = path.as_ref().join(UPDATE_FILES_PATH); + std::fs::create_dir_all(&path)?; + Ok(Self { path }) + } + + /// Creates a new temporary update file. + /// A call to `persist` is needed to persist the file in the database. + pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> { + let file = NamedTempFile::new_in(&self.path)?; + let uuid = Uuid::new_v4(); + let path = self.path.join(uuid.to_string()); + let update_file = UpdateFile { file, path }; + + Ok((uuid, update_file)) + } + + /// Returns the file corresponding to the requested uuid. + pub fn get_update(&self, uuid: Uuid) -> Result { + let path = self.path.join(uuid.to_string()); + let file = File::open(path)?; + Ok(file) + } + + /// Copies the content of the update file pointed to by `uuid` to the `dst` directory. + pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef) -> Result<()> { + let src = self.path.join(uuid.to_string()); + let mut dst = dst.as_ref().join(UPDATE_FILES_PATH); + std::fs::create_dir_all(&dst)?; + dst.push(uuid.to_string()); + std::fs::copy(src, dst)?; + Ok(()) + } + + /// Peforms a dump of the given update file uuid into the provided dump path. + pub fn dump(&self, uuid: Uuid, dump_path: impl AsRef) -> Result<()> { + let uuid_string = uuid.to_string(); + let update_file_path = self.path.join(&uuid_string); + let mut dst = dump_path.as_ref().join(UPDATE_FILES_PATH); + std::fs::create_dir_all(&dst)?; + dst.push(&uuid_string); + + let update_file = File::open(update_file_path)?; + let mut dst_file = NamedTempFile::new_in(&dump_path)?; + let (mut document_cursor, index) = + DocumentsBatchReader::from_reader(update_file)?.into_cursor_and_fields_index(); + + let mut document_buffer = Map::new(); + // TODO: we need to find a way to do this more efficiently. (create a custom serializer + // for jsonl for example...) + while let Some(document) = document_cursor.next_document()? { + for (field_id, content) in document.iter() { + if let Some(field_name) = index.name(field_id) { + let content = serde_json::from_slice(content)?; + document_buffer.insert(field_name.to_string(), content); + } + } + + serde_json::to_writer(&mut dst_file, &document_buffer)?; + dst_file.write_all(b"\n")?; + document_buffer.clear(); + } + + dst_file.persist(dst)?; + + Ok(()) + } + + pub fn get_size(&self, uuid: Uuid) -> Result { + Ok(self.get_update(uuid)?.metadata()?.len()) + } + + pub async fn delete(&self, uuid: Uuid) -> Result<()> { + let path = self.path.join(uuid.to_string()); + tokio::fs::remove_file(path).await?; + Ok(()) + } + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use nelson::Mocker; + + use super::*; + + #[derive(Clone)] + pub enum MockUpdateFileStore { + Real(store::UpdateFileStore), + Mock(Arc), + } + + impl MockUpdateFileStore { + pub fn mock(mocker: Mocker) -> Self { + Self::Mock(Arc::new(mocker)) + } + + pub fn load_dump(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { + store::UpdateFileStore::load_dump(src, dst) + } + + pub fn new(path: impl AsRef) -> Result { + store::UpdateFileStore::new(path).map(Self::Real) + } + + pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> { + match self { + MockUpdateFileStore::Real(s) => s.new_update(), + MockUpdateFileStore::Mock(_) => todo!(), + } + } + + pub fn get_update(&self, uuid: Uuid) -> Result { + match self { + MockUpdateFileStore::Real(s) => s.get_update(uuid), + MockUpdateFileStore::Mock(_) => todo!(), + } + } + + pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef) -> Result<()> { + match self { + MockUpdateFileStore::Real(s) => s.snapshot(uuid, dst), + MockUpdateFileStore::Mock(_) => todo!(), + } + } + + pub fn dump(&self, uuid: Uuid, dump_path: impl AsRef) -> Result<()> { + match self { + MockUpdateFileStore::Real(s) => s.dump(uuid, dump_path), + MockUpdateFileStore::Mock(_) => todo!(), + } + } + + pub fn get_size(&self, uuid: Uuid) -> Result { + match self { + MockUpdateFileStore::Real(s) => s.get_size(uuid), + MockUpdateFileStore::Mock(_) => todo!(), + } + } + + pub async fn delete(&self, uuid: Uuid) -> Result<()> { + match self { + MockUpdateFileStore::Real(s) => s.delete(uuid).await, + MockUpdateFileStore::Mock(mocker) => unsafe { mocker.get("delete").call(uuid) }, + } + } + } +} diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index dbebbae2a..c278c93c1 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -44,6 +44,36 @@ impl IndexScheduler { .collect::>() } + pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: Task) -> Result<()> { + let old_task = self + .get_task(wtxn, task.uid)? + .ok_or(Error::CorruptedTaskQueue)?; + + if old_task.status != task.status { + self.update_status(wtxn, old_task.status, |bitmap| { + bitmap.remove(task.uid); + bitmap + })?; + self.update_status(wtxn, task.status, |bitmap| { + bitmap.insert(task.uid); + bitmap + })?; + } + + if old_task.kind.as_kind() != task.kind.as_kind() { + self.update_kind(wtxn, old_task.kind.as_kind(), |bitmap| { + bitmap.remove(task.uid); + bitmap + })?; + self.update_kind(wtxn, task.kind.as_kind(), |bitmap| { + bitmap.insert(task.uid); + bitmap + })?; + } + + Ok(()) + } + pub(crate) fn get_index(&self, rtxn: &RoTxn, index: &str) -> Result { Ok(self.index_tasks.get(&rtxn, index)?.unwrap_or_default()) }