wip porting the index back in the scheduler

This commit is contained in:
Irevoire 2022-09-07 20:08:07 +02:00 committed by Clément Renault
parent fe330e1be9
commit d8b8e04ad1
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
14 changed files with 2493 additions and 21 deletions

10
Cargo.lock generated
View File

@ -1660,11 +1660,21 @@ version = "0.1.0"
dependencies = [
"anyhow",
"bincode",
"derivative",
"either",
"fst",
"indexmap",
"log",
"meilisearch-types",
"milli 0.33.0",
"permissive-json-pointer",
"regex",
"roaring 0.9.0",
"serde",
"serde_json",
"thiserror",
"time",
"uuid",
]
[[package]]

View File

@ -8,8 +8,18 @@ edition = "2021"
[dependencies]
anyhow = "1.0.64"
bincode = "1.3.3"
derivative = "2.2.0"
fst = "0.4.7"
indexmap = { version = "1.8.0", features = ["serde-1"] }
log = "0.4.14"
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.33.0" }
permissive-json-pointer = { path = "../permissive-json-pointer" }
meilisearch-types = { path = "../meilisearch-types" }
regex = "1.5.5"
either = { version = "1.6.1", features = ["serde"] }
roaring = "0.9.0"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = { version = "1.0.85", features = ["preserve_order"] }
thiserror = "1.0.30"
time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] }
uuid = { version = "1.1.2", features = ["serde", "v4"] }

View File

@ -1,12 +1,12 @@
use crate::{
task::{KindWithContent, Status},
Error, IndexScheduler, Result,
Error, IndexScheduler, Result, TaskId,
};
use milli::heed::RoTxn;
use crate::{task::Kind, Task};
pub enum Batch {
pub(crate) enum Batch {
Cancel(Task),
Snapshot(Vec<Task>),
Dump(Vec<Task>),
@ -21,7 +21,7 @@ impl IndexScheduler {
/// 2. We get the *next* snapshot to process.
/// 3. We get the *next* dump to process.
/// 4. We get the *next* tasks to process for a specific index.
fn get_next_batch(&self, rtxn: &RoTxn) -> Result<Batch> {
pub(crate) fn get_next_batch(&self, rtxn: &RoTxn) -> Result<Batch> {
let enqueued = &self.get_status(rtxn, Status::Enqueued)?;
let to_cancel = self.get_kind(rtxn, Kind::CancelTask)? & enqueued;
@ -108,3 +108,17 @@ impl IndexScheduler {
})
}
}
impl Batch {
pub fn task_ids(&self) -> impl IntoIterator<Item = TaskId> + '_ {
match self {
Batch::Cancel(task) | Batch::One(task) => {
Box::new(std::iter::once(task.uid)) as Box<dyn Iterator<Item = TaskId>>
}
Batch::Snapshot(tasks) | Batch::Dump(tasks) | Batch::Contiguous { tasks, .. } => {
Box::new(tasks.iter().map(|task| task.uid)) as Box<dyn Iterator<Item = TaskId>>
}
Batch::Empty => Box::new(std::iter::empty()) as Box<dyn Iterator<Item = TaskId>>,
}
}
}

View File

@ -3,8 +3,10 @@ use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("Index not found")]
IndexNotFound,
#[error("Index `{}` not found", .0)]
IndexNotFound(String),
#[error("Index `{}` already exists", .0)]
IndexAlreadyExists(String),
#[error("Corrupted task queue.")]
CorruptedTaskQueue,
#[error(transparent)]

View File

@ -0,0 +1,160 @@
use std::fs::{create_dir_all, File};
use std::io::{BufReader, Seek, SeekFrom, Write};
use std::path::Path;
use anyhow::Context;
use indexmap::IndexMap;
use milli::documents::DocumentsBatchReader;
use milli::heed::{EnvOpenOptions, RoTxn};
use milli::update::{IndexDocumentsConfig, IndexerConfig};
use serde::{Deserialize, Serialize};
use crate::document_formats::read_ndjson;
use crate::index::updates::apply_settings_to_builder;
use super::error::Result;
use super::{index::Index, Settings, Unchecked};
#[derive(Serialize, Deserialize)]
struct DumpMeta {
settings: Settings<Unchecked>,
primary_key: Option<String>,
}
const META_FILE_NAME: &str = "meta.json";
const DATA_FILE_NAME: &str = "documents.jsonl";
impl Index {
pub fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
// acquire write txn make sure any ongoing write is finished before we start.
let txn = self.write_txn()?;
let path = path.as_ref().join(format!("indexes/{}", self.uuid));
create_dir_all(&path)?;
self.dump_documents(&txn, &path)?;
self.dump_meta(&txn, &path)?;
Ok(())
}
fn dump_documents(&self, txn: &RoTxn, path: impl AsRef<Path>) -> Result<()> {
let document_file_path = path.as_ref().join(DATA_FILE_NAME);
let mut document_file = File::create(&document_file_path)?;
let documents = self.all_documents(txn)?;
let fields_ids_map = self.fields_ids_map(txn)?;
// dump documents
let mut json_map = IndexMap::new();
for document in documents {
let (_, reader) = document?;
for (fid, bytes) in reader.iter() {
if let Some(name) = fields_ids_map.name(fid) {
json_map.insert(name, serde_json::from_slice::<serde_json::Value>(bytes)?);
}
}
serde_json::to_writer(&mut document_file, &json_map)?;
document_file.write_all(b"\n")?;
json_map.clear();
}
Ok(())
}
fn dump_meta(&self, txn: &RoTxn, path: impl AsRef<Path>) -> Result<()> {
let meta_file_path = path.as_ref().join(META_FILE_NAME);
let mut meta_file = File::create(&meta_file_path)?;
let settings = self.settings_txn(txn)?.into_unchecked();
let primary_key = self.primary_key(txn)?.map(String::from);
let meta = DumpMeta {
settings,
primary_key,
};
serde_json::to_writer(&mut meta_file, &meta)?;
Ok(())
}
pub fn load_dump(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
size: usize,
indexer_config: &IndexerConfig,
) -> anyhow::Result<()> {
let dir_name = src
.as_ref()
.file_name()
.with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?;
let dst_dir_path = dst.as_ref().join("indexes").join(dir_name);
create_dir_all(&dst_dir_path)?;
let meta_path = src.as_ref().join(META_FILE_NAME);
let meta_file = File::open(meta_path)?;
let DumpMeta {
settings,
primary_key,
} = serde_json::from_reader(meta_file)?;
let settings = settings.check();
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, &dst_dir_path)?;
let mut txn = index.write_txn()?;
// Apply settings first
let mut builder = milli::update::Settings::new(&mut txn, &index, indexer_config);
if let Some(primary_key) = primary_key {
builder.set_primary_key(primary_key);
}
apply_settings_to_builder(&settings, &mut builder);
builder.execute(|_| ())?;
let document_file_path = src.as_ref().join(DATA_FILE_NAME);
let reader = BufReader::new(File::open(&document_file_path)?);
let mut tmp_doc_file = tempfile::tempfile()?;
let empty = match read_ndjson(reader, &mut tmp_doc_file) {
// if there was no document in the file it's because the index was empty
Ok(0) => true,
Ok(_) => false,
Err(e) => return Err(e.into()),
};
if !empty {
tmp_doc_file.seek(SeekFrom::Start(0))?;
let documents_reader = DocumentsBatchReader::from_reader(tmp_doc_file)?;
//If the document file is empty, we don't perform the document addition, to prevent
//a primary key error to be thrown.
let config = IndexDocumentsConfig::default();
let builder = milli::update::IndexDocuments::new(
&mut txn,
&index,
indexer_config,
config,
|_| (),
)?;
let (builder, user_error) = builder.add_documents(documents_reader)?;
user_error?;
builder.execute()?;
}
txn.commit()?;
index.prepare_for_closing().wait();
Ok(())
}
}

View File

@ -0,0 +1,61 @@
use std::error::Error;
use meilisearch_types::error::{Code, ErrorCode};
use meilisearch_types::internal_error;
use serde_json::Value;
use crate::{error::MilliError, update_file_store};
pub type Result<T> = std::result::Result<T, IndexError>;
#[derive(Debug, thiserror::Error)]
pub enum IndexError {
#[error("An internal error has occurred. `{0}`.")]
Internal(Box<dyn Error + Send + Sync + 'static>),
#[error("Document `{0}` not found.")]
DocumentNotFound(String),
#[error("{0}")]
Facet(#[from] FacetError),
#[error("{0}")]
Milli(#[from] milli::Error),
}
internal_error!(
IndexError: std::io::Error,
milli::heed::Error,
fst::Error,
serde_json::Error,
update_file_store::UpdateFileStoreError,
milli::documents::Error
);
impl ErrorCode for IndexError {
fn error_code(&self) -> Code {
match self {
IndexError::Internal(_) => Code::Internal,
IndexError::DocumentNotFound(_) => Code::DocumentNotFound,
IndexError::Facet(e) => e.error_code(),
IndexError::Milli(e) => MilliError(e).error_code(),
}
}
}
impl From<milli::UserError> for IndexError {
fn from(error: milli::UserError) -> IndexError {
IndexError::Milli(error.into())
}
}
#[derive(Debug, thiserror::Error)]
pub enum FacetError {
#[error("Invalid syntax for the filter parameter: `expected {}, found: {1}`.", .0.join(", "))]
InvalidExpression(&'static [&'static str], Value),
}
impl ErrorCode for FacetError {
fn error_code(&self) -> Code {
match self {
FacetError::InvalidExpression(_, _) => Code::Filter,
}
}
}

View File

@ -0,0 +1,326 @@
use std::collections::BTreeSet;
use std::fs::create_dir_all;
use std::marker::PhantomData;
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use fst::IntoStreamer;
use milli::heed::{CompactionOption, EnvOpenOptions, RoTxn};
use milli::update::{IndexerConfig, Setting};
use milli::{obkv_to_json, FieldDistribution, DEFAULT_VALUES_PER_FACET};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use time::OffsetDateTime;
use uuid::Uuid;
use crate::index::search::DEFAULT_PAGINATION_MAX_TOTAL_HITS;
use super::error::IndexError;
use super::error::Result;
use super::updates::{FacetingSettings, MinWordSizeTyposSetting, PaginationSettings, TypoSettings};
use super::{Checked, Settings};
pub type Document = Map<String, Value>;
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct IndexMeta {
#[serde(with = "time::serde::rfc3339")]
pub created_at: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
pub updated_at: OffsetDateTime,
pub primary_key: Option<String>,
}
impl IndexMeta {
pub fn new(index: &Index) -> Result<Self> {
let txn = index.read_txn()?;
Self::new_txn(index, &txn)
}
pub fn new_txn(index: &Index, txn: &milli::heed::RoTxn) -> Result<Self> {
let created_at = index.created_at(txn)?;
let updated_at = index.updated_at(txn)?;
let primary_key = index.primary_key(txn)?.map(String::from);
Ok(Self {
created_at,
updated_at,
primary_key,
})
}
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct IndexStats {
#[serde(skip)]
pub size: u64,
pub number_of_documents: u64,
/// Whether the current index is performing an update. It is initially `None` when the
/// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is
/// later set to either true or false, we we retrieve the information from the `UpdateStore`
pub is_indexing: Option<bool>,
pub field_distribution: FieldDistribution,
}
#[derive(Clone, derivative::Derivative)]
#[derivative(Debug)]
pub struct Index {
pub name: String,
#[derivative(Debug = "ignore")]
pub inner: Arc<milli::Index>,
#[derivative(Debug = "ignore")]
pub indexer_config: Arc<IndexerConfig>,
}
impl Deref for Index {
type Target = milli::Index;
fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}
impl Index {
pub fn open(
path: impl AsRef<Path>,
name: String,
size: usize,
update_handler: Arc<IndexerConfig>,
) -> Result<Self> {
log::debug!("opening index in {}", path.as_ref().display());
create_dir_all(&path)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let inner = Arc::new(milli::Index::new(options, &path)?);
Ok(Index {
name,
inner,
indexer_config: update_handler,
})
}
/// Asynchronously close the underlying index
pub fn close(self) {
self.inner.as_ref().clone().prepare_for_closing();
}
pub fn stats(&self) -> Result<IndexStats> {
let rtxn = self.read_txn()?;
Ok(IndexStats {
size: self.size()?,
number_of_documents: self.number_of_documents(&rtxn)?,
is_indexing: None,
field_distribution: self.field_distribution(&rtxn)?,
})
}
pub fn meta(&self) -> Result<IndexMeta> {
IndexMeta::new(self)
}
pub fn settings(&self) -> Result<Settings<Checked>> {
let txn = self.read_txn()?;
self.settings_txn(&txn)
}
pub fn name(&self) -> String {
self.name
}
pub fn settings_txn(&self, txn: &RoTxn) -> Result<Settings<Checked>> {
let displayed_attributes = self
.displayed_fields(txn)?
.map(|fields| fields.into_iter().map(String::from).collect());
let searchable_attributes = self
.user_defined_searchable_fields(txn)?
.map(|fields| fields.into_iter().map(String::from).collect());
let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect();
let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect();
let criteria = self
.criteria(txn)?
.into_iter()
.map(|c| c.to_string())
.collect();
let stop_words = self
.stop_words(txn)?
.map(|stop_words| -> Result<BTreeSet<_>> {
Ok(stop_words.stream().into_strs()?.into_iter().collect())
})
.transpose()?
.unwrap_or_default();
let distinct_field = self.distinct_field(txn)?.map(String::from);
// in milli each word in the synonyms map were split on their separator. Since we lost
// this information we are going to put space between words.
let synonyms = self
.synonyms(txn)?
.iter()
.map(|(key, values)| {
(
key.join(" "),
values.iter().map(|value| value.join(" ")).collect(),
)
})
.collect();
let min_typo_word_len = MinWordSizeTyposSetting {
one_typo: Setting::Set(self.min_word_len_one_typo(txn)?),
two_typos: Setting::Set(self.min_word_len_two_typos(txn)?),
};
let disabled_words = match self.exact_words(txn)? {
Some(fst) => fst.into_stream().into_strs()?.into_iter().collect(),
None => BTreeSet::new(),
};
let disabled_attributes = self
.exact_attributes(txn)?
.into_iter()
.map(String::from)
.collect();
let typo_tolerance = TypoSettings {
enabled: Setting::Set(self.authorize_typos(txn)?),
min_word_size_for_typos: Setting::Set(min_typo_word_len),
disable_on_words: Setting::Set(disabled_words),
disable_on_attributes: Setting::Set(disabled_attributes),
};
let faceting = FacetingSettings {
max_values_per_facet: Setting::Set(
self.max_values_per_facet(txn)?
.unwrap_or(DEFAULT_VALUES_PER_FACET),
),
};
let pagination = PaginationSettings {
max_total_hits: Setting::Set(
self.pagination_max_total_hits(txn)?
.unwrap_or(DEFAULT_PAGINATION_MAX_TOTAL_HITS),
),
};
Ok(Settings {
displayed_attributes: match displayed_attributes {
Some(attrs) => Setting::Set(attrs),
None => Setting::Reset,
},
searchable_attributes: match searchable_attributes {
Some(attrs) => Setting::Set(attrs),
None => Setting::Reset,
},
filterable_attributes: Setting::Set(filterable_attributes),
sortable_attributes: Setting::Set(sortable_attributes),
ranking_rules: Setting::Set(criteria),
stop_words: Setting::Set(stop_words),
distinct_attribute: match distinct_field {
Some(field) => Setting::Set(field),
None => Setting::Reset,
},
synonyms: Setting::Set(synonyms),
typo_tolerance: Setting::Set(typo_tolerance),
faceting: Setting::Set(faceting),
pagination: Setting::Set(pagination),
_kind: PhantomData,
})
}
/// Return the total number of documents contained in the index + the selected documents.
pub fn retrieve_documents<S: AsRef<str>>(
&self,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<S>>,
) -> Result<(u64, Vec<Document>)> {
let txn = self.read_txn()?;
let fields_ids_map = self.fields_ids_map(&txn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
let mut documents = Vec::new();
for entry in self.all_documents(&txn)?.skip(offset).take(limit) {
let (_id, obkv) = entry?;
let document = obkv_to_json(&all_fields, &fields_ids_map, obkv)?;
let document = match &attributes_to_retrieve {
Some(attributes_to_retrieve) => permissive_json_pointer::select_values(
&document,
attributes_to_retrieve.iter().map(|s| s.as_ref()),
),
None => document,
};
documents.push(document);
}
let number_of_documents = self.number_of_documents(&txn)?;
Ok((number_of_documents, documents))
}
pub fn retrieve_document<S: AsRef<str>>(
&self,
doc_id: String,
attributes_to_retrieve: Option<Vec<S>>,
) -> Result<Document> {
let txn = self.read_txn()?;
let fields_ids_map = self.fields_ids_map(&txn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
let internal_id = self
.external_documents_ids(&txn)?
.get(doc_id.as_bytes())
.ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?;
let document = self
.documents(&txn, std::iter::once(internal_id))?
.into_iter()
.next()
.map(|(_, d)| d)
.ok_or(IndexError::DocumentNotFound(doc_id))?;
let document = obkv_to_json(&all_fields, &fields_ids_map, document)?;
let document = match &attributes_to_retrieve {
Some(attributes_to_retrieve) => permissive_json_pointer::select_values(
&document,
attributes_to_retrieve.iter().map(|s| s.as_ref()),
),
None => document,
};
Ok(document)
}
pub fn size(&self) -> Result<u64> {
self.inner.on_disk_size()
}
pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> {
let mut dst = path.as_ref().join(format!("indexes/{}/", self.name));
create_dir_all(&dst)?;
dst.push("data.mdb");
let _txn = self.write_txn()?;
self.inner.copy_to_path(dst, CompactionOption::Enabled)?;
Ok(())
}
}
/// When running tests, when a server instance is dropped, the environment is not actually closed,
/// leaving a lot of open file descriptors.
impl Drop for Index {
fn drop(&mut self) {
// When dropping the last instance of an index, we want to close the index
// Note that the close is actually performed only if all the instances a effectively
// dropped
if Arc::strong_count(&self.inner) == 1 {
self.inner.as_ref().clone().prepare_for_closing();
}
}
}

View File

@ -0,0 +1,249 @@
pub use search::{
MatchingStrategy, SearchQuery, SearchResult, DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER,
DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT,
};
pub use updates::{apply_settings_to_builder, Checked, Facets, Settings, Unchecked};
mod dump;
pub mod error;
mod search;
pub mod updates;
#[allow(clippy::module_inception)]
mod index;
pub use index::{Document, IndexMeta, IndexStats};
#[cfg(not(test))]
pub use index::Index;
#[cfg(test)]
pub use test::MockIndex as Index;
/// The index::test module provides means of mocking an index instance. I can be used throughout the
/// code for unit testing, in places where an index would normally be used.
#[cfg(test)]
pub mod test {
use std::path::{Path, PathBuf};
use std::sync::Arc;
use milli::update::{
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod, IndexerConfig,
};
use nelson::Mocker;
use uuid::Uuid;
use super::error::Result;
use super::index::Index;
use super::Document;
use super::{Checked, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings};
use crate::update_file_store::UpdateFileStore;
#[derive(Clone)]
pub enum MockIndex {
Real(Index),
Mock(Arc<Mocker>),
}
impl MockIndex {
pub fn mock(mocker: Mocker) -> Self {
Self::Mock(Arc::new(mocker))
}
pub fn open(
path: impl AsRef<Path>,
size: usize,
uuid: Uuid,
update_handler: Arc<IndexerConfig>,
) -> Result<Self> {
let index = Index::open(path, size, uuid, update_handler)?;
Ok(Self::Real(index))
}
pub fn load_dump(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
size: usize,
update_handler: &IndexerConfig,
) -> anyhow::Result<()> {
Index::load_dump(src, dst, size, update_handler)
}
pub fn uuid(&self) -> Uuid {
match self {
MockIndex::Real(index) => index.uuid(),
MockIndex::Mock(m) => unsafe { m.get("uuid").call(()) },
}
}
pub fn stats(&self) -> Result<IndexStats> {
match self {
MockIndex::Real(index) => index.stats(),
MockIndex::Mock(m) => unsafe { m.get("stats").call(()) },
}
}
pub fn meta(&self) -> Result<IndexMeta> {
match self {
MockIndex::Real(index) => index.meta(),
MockIndex::Mock(_) => todo!(),
}
}
pub fn settings(&self) -> Result<Settings<Checked>> {
match self {
MockIndex::Real(index) => index.settings(),
MockIndex::Mock(_) => todo!(),
}
}
pub fn retrieve_documents<S: AsRef<str>>(
&self,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<S>>,
) -> Result<(u64, Vec<Document>)> {
match self {
MockIndex::Real(index) => {
index.retrieve_documents(offset, limit, attributes_to_retrieve)
}
MockIndex::Mock(_) => todo!(),
}
}
pub fn retrieve_document<S: AsRef<str>>(
&self,
doc_id: String,
attributes_to_retrieve: Option<Vec<S>>,
) -> Result<Document> {
match self {
MockIndex::Real(index) => index.retrieve_document(doc_id, attributes_to_retrieve),
MockIndex::Mock(_) => todo!(),
}
}
pub fn size(&self) -> u64 {
match self {
MockIndex::Real(index) => index.size(),
MockIndex::Mock(_) => todo!(),
}
}
pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> {
match self {
MockIndex::Real(index) => index.snapshot(path),
MockIndex::Mock(m) => unsafe { m.get("snapshot").call(path.as_ref()) },
}
}
pub fn close(self) {
match self {
MockIndex::Real(index) => index.close(),
MockIndex::Mock(m) => unsafe { m.get("close").call(()) },
}
}
pub fn perform_search(&self, query: SearchQuery) -> Result<SearchResult> {
match self {
MockIndex::Real(index) => index.perform_search(query),
MockIndex::Mock(m) => unsafe { m.get("perform_search").call(query) },
}
}
pub fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
match self {
MockIndex::Real(index) => index.dump(path),
MockIndex::Mock(m) => unsafe { m.get("dump").call(path.as_ref()) },
}
}
pub fn update_documents(
&self,
method: IndexDocumentsMethod,
primary_key: Option<String>,
file_store: UpdateFileStore,
contents: impl Iterator<Item = Uuid>,
) -> Result<Vec<Result<DocumentAdditionResult>>> {
match self {
MockIndex::Real(index) => {
index.update_documents(method, primary_key, file_store, contents)
}
MockIndex::Mock(mocker) => unsafe {
mocker
.get("update_documents")
.call((method, primary_key, file_store, contents))
},
}
}
pub fn update_settings(&self, settings: &Settings<Checked>) -> Result<()> {
match self {
MockIndex::Real(index) => index.update_settings(settings),
MockIndex::Mock(m) => unsafe { m.get("update_settings").call(settings) },
}
}
pub fn update_primary_key(&self, primary_key: String) -> Result<IndexMeta> {
match self {
MockIndex::Real(index) => index.update_primary_key(primary_key),
MockIndex::Mock(m) => unsafe { m.get("update_primary_key").call(primary_key) },
}
}
pub fn delete_documents(&self, ids: &[String]) -> Result<DocumentDeletionResult> {
match self {
MockIndex::Real(index) => index.delete_documents(ids),
MockIndex::Mock(m) => unsafe { m.get("delete_documents").call(ids) },
}
}
pub fn clear_documents(&self) -> Result<()> {
match self {
MockIndex::Real(index) => index.clear_documents(),
MockIndex::Mock(m) => unsafe { m.get("clear_documents").call(()) },
}
}
}
#[test]
fn test_faux_index() {
let faux = Mocker::default();
faux.when("snapshot")
.times(2)
.then(|_: &Path| -> Result<()> { Ok(()) });
let index = MockIndex::mock(faux);
let path = PathBuf::from("hello");
index.snapshot(&path).unwrap();
index.snapshot(&path).unwrap();
}
#[test]
#[should_panic]
fn test_faux_unexisting_method_stub() {
let faux = Mocker::default();
let index = MockIndex::mock(faux);
let path = PathBuf::from("hello");
index.snapshot(&path).unwrap();
index.snapshot(&path).unwrap();
}
#[test]
#[should_panic]
fn test_faux_panic() {
let faux = Mocker::default();
faux.when("snapshot")
.times(2)
.then(|_: &Path| -> Result<()> {
panic!();
});
let index = MockIndex::mock(faux);
let path = PathBuf::from("hello");
index.snapshot(&path).unwrap();
index.snapshot(&path).unwrap();
}
}

View File

@ -0,0 +1,688 @@
use std::cmp::min;
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::str::FromStr;
use std::time::Instant;
use either::Either;
use milli::tokenizer::TokenizerBuilder;
use milli::{
AscDesc, FieldId, FieldsIdsMap, Filter, FormatOptions, MatchBounds, MatcherBuilder, SortError,
TermsMatchingStrategy, DEFAULT_VALUES_PER_FACET,
};
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use crate::index::error::FacetError;
use super::error::{IndexError, Result};
use super::index::Index;
pub type Document = serde_json::Map<String, Value>;
type MatchesPosition = BTreeMap<String, Vec<MatchBounds>>;
pub const DEFAULT_SEARCH_LIMIT: fn() -> usize = || 20;
pub const DEFAULT_CROP_LENGTH: fn() -> usize = || 10;
pub const DEFAULT_CROP_MARKER: fn() -> String = || "".to_string();
pub const DEFAULT_HIGHLIGHT_PRE_TAG: fn() -> String = || "<em>".to_string();
pub const DEFAULT_HIGHLIGHT_POST_TAG: fn() -> String = || "</em>".to_string();
/// The maximimum number of results that the engine
/// will be able to return in one search call.
pub const DEFAULT_PAGINATION_MAX_TOTAL_HITS: usize = 1000;
#[derive(Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct SearchQuery {
pub q: Option<String>,
pub offset: Option<usize>,
#[serde(default = "DEFAULT_SEARCH_LIMIT")]
pub limit: usize,
pub attributes_to_retrieve: Option<BTreeSet<String>>,
pub attributes_to_crop: Option<Vec<String>>,
#[serde(default = "DEFAULT_CROP_LENGTH")]
pub crop_length: usize,
pub attributes_to_highlight: Option<HashSet<String>>,
// Default to false
#[serde(default = "Default::default")]
pub show_matches_position: bool,
pub filter: Option<Value>,
pub sort: Option<Vec<String>>,
pub facets: Option<Vec<String>>,
#[serde(default = "DEFAULT_HIGHLIGHT_PRE_TAG")]
pub highlight_pre_tag: String,
#[serde(default = "DEFAULT_HIGHLIGHT_POST_TAG")]
pub highlight_post_tag: String,
#[serde(default = "DEFAULT_CROP_MARKER")]
pub crop_marker: String,
#[serde(default)]
pub matching_strategy: MatchingStrategy,
}
#[derive(Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub enum MatchingStrategy {
/// Remove query words from last to first
Last,
/// All query words are mandatory
All,
}
impl Default for MatchingStrategy {
fn default() -> Self {
Self::Last
}
}
impl From<MatchingStrategy> for TermsMatchingStrategy {
fn from(other: MatchingStrategy) -> Self {
match other {
MatchingStrategy::Last => Self::Last,
MatchingStrategy::All => Self::All,
}
}
}
#[derive(Debug, Clone, Serialize, PartialEq)]
pub struct SearchHit {
#[serde(flatten)]
pub document: Document,
#[serde(rename = "_formatted", skip_serializing_if = "Document::is_empty")]
pub formatted: Document,
#[serde(rename = "_matchesPosition", skip_serializing_if = "Option::is_none")]
pub matches_position: Option<MatchesPosition>,
}
#[derive(Serialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct SearchResult {
pub hits: Vec<SearchHit>,
pub estimated_total_hits: u64,
pub query: String,
pub limit: usize,
pub offset: usize,
pub processing_time_ms: u128,
#[serde(skip_serializing_if = "Option::is_none")]
pub facet_distribution: Option<BTreeMap<String, BTreeMap<String, u64>>>,
}
impl Index {
pub fn perform_search(&self, query: SearchQuery) -> Result<SearchResult> {
let before_search = Instant::now();
let rtxn = self.read_txn()?;
let mut search = self.search(&rtxn);
if let Some(ref query) = query.q {
search.query(query);
}
search.terms_matching_strategy(query.matching_strategy.into());
let max_total_hits = self
.pagination_max_total_hits(&rtxn)?
.unwrap_or(DEFAULT_PAGINATION_MAX_TOTAL_HITS);
// Make sure that a user can't get more documents than the hard limit,
// we align that on the offset too.
let offset = min(query.offset.unwrap_or(0), max_total_hits);
let limit = min(query.limit, max_total_hits.saturating_sub(offset));
search.offset(offset);
search.limit(limit);
if let Some(ref filter) = query.filter {
if let Some(facets) = parse_filter(filter)? {
search.filter(facets);
}
}
if let Some(ref sort) = query.sort {
let sort = match sort.iter().map(|s| AscDesc::from_str(s)).collect() {
Ok(sorts) => sorts,
Err(asc_desc_error) => {
return Err(IndexError::Milli(SortError::from(asc_desc_error).into()))
}
};
search.sort_criteria(sort);
}
let milli::SearchResult {
documents_ids,
matching_words,
candidates,
..
} = search.execute()?;
let fields_ids_map = self.fields_ids_map(&rtxn).unwrap();
let displayed_ids = self
.displayed_fields_ids(&rtxn)?
.map(|fields| fields.into_iter().collect::<BTreeSet<_>>())
.unwrap_or_else(|| fields_ids_map.iter().map(|(id, _)| id).collect());
let fids = |attrs: &BTreeSet<String>| {
let mut ids = BTreeSet::new();
for attr in attrs {
if attr == "*" {
ids = displayed_ids.clone();
break;
}
if let Some(id) = fields_ids_map.id(attr) {
ids.insert(id);
}
}
ids
};
// The attributes to retrieve are the ones explicitly marked as to retrieve (all by default),
// but these attributes must be also be present
// - in the fields_ids_map
// - in the the displayed attributes
let to_retrieve_ids: BTreeSet<_> = query
.attributes_to_retrieve
.as_ref()
.map(fids)
.unwrap_or_else(|| displayed_ids.clone())
.intersection(&displayed_ids)
.cloned()
.collect();
let attr_to_highlight = query.attributes_to_highlight.unwrap_or_default();
let attr_to_crop = query.attributes_to_crop.unwrap_or_default();
// Attributes in `formatted_options` correspond to the attributes that will be in `_formatted`
// These attributes are:
// - the attributes asked to be highlighted or cropped (with `attributesToCrop` or `attributesToHighlight`)
// - the attributes asked to be retrieved: these attributes will not be highlighted/cropped
// But these attributes must be also present in displayed attributes
let formatted_options = compute_formatted_options(
&attr_to_highlight,
&attr_to_crop,
query.crop_length,
&to_retrieve_ids,
&fields_ids_map,
&displayed_ids,
);
let tokenizer = TokenizerBuilder::default().build();
let mut formatter_builder = MatcherBuilder::new(matching_words, tokenizer);
formatter_builder.crop_marker(query.crop_marker);
formatter_builder.highlight_prefix(query.highlight_pre_tag);
formatter_builder.highlight_suffix(query.highlight_post_tag);
let mut documents = Vec::new();
let documents_iter = self.documents(&rtxn, documents_ids)?;
for (_id, obkv) in documents_iter {
// First generate a document with all the displayed fields
let displayed_document = make_document(&displayed_ids, &fields_ids_map, obkv)?;
// select the attributes to retrieve
let attributes_to_retrieve = to_retrieve_ids
.iter()
.map(|&fid| fields_ids_map.name(fid).expect("Missing field name"));
let mut document =
permissive_json_pointer::select_values(&displayed_document, attributes_to_retrieve);
let (matches_position, formatted) = format_fields(
&displayed_document,
&fields_ids_map,
&formatter_builder,
&formatted_options,
query.show_matches_position,
&displayed_ids,
)?;
if let Some(sort) = query.sort.as_ref() {
insert_geo_distance(sort, &mut document);
}
let hit = SearchHit {
document,
formatted,
matches_position,
};
documents.push(hit);
}
let estimated_total_hits = candidates.len();
let facet_distribution = match query.facets {
Some(ref fields) => {
let mut facet_distribution = self.facets_distribution(&rtxn);
let max_values_by_facet = self
.max_values_per_facet(&rtxn)?
.unwrap_or(DEFAULT_VALUES_PER_FACET);
facet_distribution.max_values_per_facet(max_values_by_facet);
if fields.iter().all(|f| f != "*") {
facet_distribution.facets(fields);
}
let distribution = facet_distribution.candidates(candidates).execute()?;
Some(distribution)
}
None => None,
};
let result = SearchResult {
hits: documents,
estimated_total_hits,
query: query.q.clone().unwrap_or_default(),
limit: query.limit,
offset: query.offset.unwrap_or_default(),
processing_time_ms: before_search.elapsed().as_millis(),
facet_distribution,
};
Ok(result)
}
}
fn insert_geo_distance(sorts: &[String], document: &mut Document) {
lazy_static::lazy_static! {
static ref GEO_REGEX: Regex =
Regex::new(r"_geoPoint\(\s*([[:digit:].\-]+)\s*,\s*([[:digit:].\-]+)\s*\)").unwrap();
};
if let Some(capture_group) = sorts.iter().find_map(|sort| GEO_REGEX.captures(sort)) {
// TODO: TAMO: milli encountered an internal error, what do we want to do?
let base = [
capture_group[1].parse().unwrap(),
capture_group[2].parse().unwrap(),
];
let geo_point = &document.get("_geo").unwrap_or(&json!(null));
if let Some((lat, lng)) = geo_point["lat"].as_f64().zip(geo_point["lng"].as_f64()) {
let distance = milli::distance_between_two_points(&base, &[lat, lng]);
document.insert("_geoDistance".to_string(), json!(distance.round() as usize));
}
}
}
fn compute_formatted_options(
attr_to_highlight: &HashSet<String>,
attr_to_crop: &[String],
query_crop_length: usize,
to_retrieve_ids: &BTreeSet<FieldId>,
fields_ids_map: &FieldsIdsMap,
displayed_ids: &BTreeSet<FieldId>,
) -> BTreeMap<FieldId, FormatOptions> {
let mut formatted_options = BTreeMap::new();
add_highlight_to_formatted_options(
&mut formatted_options,
attr_to_highlight,
fields_ids_map,
displayed_ids,
);
add_crop_to_formatted_options(
&mut formatted_options,
attr_to_crop,
query_crop_length,
fields_ids_map,
displayed_ids,
);
// Should not return `_formatted` if no valid attributes to highlight/crop
if !formatted_options.is_empty() {
add_non_formatted_ids_to_formatted_options(&mut formatted_options, to_retrieve_ids);
}
formatted_options
}
fn add_highlight_to_formatted_options(
formatted_options: &mut BTreeMap<FieldId, FormatOptions>,
attr_to_highlight: &HashSet<String>,
fields_ids_map: &FieldsIdsMap,
displayed_ids: &BTreeSet<FieldId>,
) {
for attr in attr_to_highlight {
let new_format = FormatOptions {
highlight: true,
crop: None,
};
if attr == "*" {
for id in displayed_ids {
formatted_options.insert(*id, new_format);
}
break;
}
if let Some(id) = fields_ids_map.id(attr) {
if displayed_ids.contains(&id) {
formatted_options.insert(id, new_format);
}
}
}
}
fn add_crop_to_formatted_options(
formatted_options: &mut BTreeMap<FieldId, FormatOptions>,
attr_to_crop: &[String],
crop_length: usize,
fields_ids_map: &FieldsIdsMap,
displayed_ids: &BTreeSet<FieldId>,
) {
for attr in attr_to_crop {
let mut split = attr.rsplitn(2, ':');
let (attr_name, attr_len) = match split.next().zip(split.next()) {
Some((len, name)) => {
let crop_len = len.parse::<usize>().unwrap_or(crop_length);
(name, crop_len)
}
None => (attr.as_str(), crop_length),
};
if attr_name == "*" {
for id in displayed_ids {
formatted_options
.entry(*id)
.and_modify(|f| f.crop = Some(attr_len))
.or_insert(FormatOptions {
highlight: false,
crop: Some(attr_len),
});
}
}
if let Some(id) = fields_ids_map.id(attr_name) {
if displayed_ids.contains(&id) {
formatted_options
.entry(id)
.and_modify(|f| f.crop = Some(attr_len))
.or_insert(FormatOptions {
highlight: false,
crop: Some(attr_len),
});
}
}
}
}
fn add_non_formatted_ids_to_formatted_options(
formatted_options: &mut BTreeMap<FieldId, FormatOptions>,
to_retrieve_ids: &BTreeSet<FieldId>,
) {
for id in to_retrieve_ids {
formatted_options.entry(*id).or_insert(FormatOptions {
highlight: false,
crop: None,
});
}
}
fn make_document(
displayed_attributes: &BTreeSet<FieldId>,
field_ids_map: &FieldsIdsMap,
obkv: obkv::KvReaderU16,
) -> Result<Document> {
let mut document = serde_json::Map::new();
// recreate the original json
for (key, value) in obkv.iter() {
let value = serde_json::from_slice(value)?;
let key = field_ids_map
.name(key)
.expect("Missing field name")
.to_string();
document.insert(key, value);
}
// select the attributes to retrieve
let displayed_attributes = displayed_attributes
.iter()
.map(|&fid| field_ids_map.name(fid).expect("Missing field name"));
let document = permissive_json_pointer::select_values(&document, displayed_attributes);
Ok(document)
}
fn format_fields<'a, A: AsRef<[u8]>>(
document: &Document,
field_ids_map: &FieldsIdsMap,
builder: &MatcherBuilder<'a, A>,
formatted_options: &BTreeMap<FieldId, FormatOptions>,
compute_matches: bool,
displayable_ids: &BTreeSet<FieldId>,
) -> Result<(Option<MatchesPosition>, Document)> {
let mut matches_position = compute_matches.then(BTreeMap::new);
let mut document = document.clone();
// select the attributes to retrieve
let displayable_names = displayable_ids
.iter()
.map(|&fid| field_ids_map.name(fid).expect("Missing field name"));
permissive_json_pointer::map_leaf_values(&mut document, displayable_names, |key, value| {
// To get the formatting option of each key we need to see all the rules that applies
// to the value and merge them together. eg. If a user said he wanted to highlight `doggo`
// and crop `doggo.name`. `doggo.name` needs to be highlighted + cropped while `doggo.age` is only
// highlighted.
let format = formatted_options
.iter()
.filter(|(field, _option)| {
let name = field_ids_map.name(**field).unwrap();
milli::is_faceted_by(name, key) || milli::is_faceted_by(key, name)
})
.map(|(_, option)| *option)
.reduce(|acc, option| acc.merge(option));
let mut infos = Vec::new();
*value = format_value(
std::mem::take(value),
builder,
format,
&mut infos,
compute_matches,
);
if let Some(matches) = matches_position.as_mut() {
if !infos.is_empty() {
matches.insert(key.to_owned(), infos);
}
}
});
let selectors = formatted_options
.keys()
// This unwrap must be safe since we got the ids from the fields_ids_map just
// before.
.map(|&fid| field_ids_map.name(fid).unwrap());
let document = permissive_json_pointer::select_values(&document, selectors);
Ok((matches_position, document))
}
fn format_value<'a, A: AsRef<[u8]>>(
value: Value,
builder: &MatcherBuilder<'a, A>,
format_options: Option<FormatOptions>,
infos: &mut Vec<MatchBounds>,
compute_matches: bool,
) -> Value {
match value {
Value::String(old_string) => {
let mut matcher = builder.build(&old_string);
if compute_matches {
let matches = matcher.matches();
infos.extend_from_slice(&matches[..]);
}
match format_options {
Some(format_options) => {
let value = matcher.format(format_options);
Value::String(value.into_owned())
}
None => Value::String(old_string),
}
}
Value::Array(values) => Value::Array(
values
.into_iter()
.map(|v| {
format_value(
v,
builder,
format_options.map(|format_options| FormatOptions {
highlight: format_options.highlight,
crop: None,
}),
infos,
compute_matches,
)
})
.collect(),
),
Value::Object(object) => Value::Object(
object
.into_iter()
.map(|(k, v)| {
(
k,
format_value(
v,
builder,
format_options.map(|format_options| FormatOptions {
highlight: format_options.highlight,
crop: None,
}),
infos,
compute_matches,
),
)
})
.collect(),
),
Value::Number(number) => {
let s = number.to_string();
let mut matcher = builder.build(&s);
if compute_matches {
let matches = matcher.matches();
infos.extend_from_slice(&matches[..]);
}
match format_options {
Some(format_options) => {
let value = matcher.format(format_options);
Value::String(value.into_owned())
}
None => Value::Number(number),
}
}
value => value,
}
}
fn parse_filter(facets: &Value) -> Result<Option<Filter>> {
match facets {
Value::String(expr) => {
let condition = Filter::from_str(expr)?;
Ok(condition)
}
Value::Array(arr) => parse_filter_array(arr),
v => Err(FacetError::InvalidExpression(&["Array"], v.clone()).into()),
}
}
fn parse_filter_array(arr: &[Value]) -> Result<Option<Filter>> {
let mut ands = Vec::new();
for value in arr {
match value {
Value::String(s) => ands.push(Either::Right(s.as_str())),
Value::Array(arr) => {
let mut ors = Vec::new();
for value in arr {
match value {
Value::String(s) => ors.push(s.as_str()),
v => {
return Err(FacetError::InvalidExpression(&["String"], v.clone()).into())
}
}
}
ands.push(Either::Left(ors));
}
v => {
return Err(
FacetError::InvalidExpression(&["String", "[String]"], v.clone()).into(),
)
}
}
}
Ok(Filter::from_array(ands)?)
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_insert_geo_distance() {
let value: Document = serde_json::from_str(
r#"{
"_geo": {
"lat": 50.629973371633746,
"lng": 3.0569447399419567
},
"city": "Lille",
"id": "1"
}"#,
)
.unwrap();
let sorters = &["_geoPoint(50.629973371633746,3.0569447399419567):desc".to_string()];
let mut document = value.clone();
insert_geo_distance(sorters, &mut document);
assert_eq!(document.get("_geoDistance"), Some(&json!(0)));
let sorters = &["_geoPoint(50.629973371633746, 3.0569447399419567):asc".to_string()];
let mut document = value.clone();
insert_geo_distance(sorters, &mut document);
assert_eq!(document.get("_geoDistance"), Some(&json!(0)));
let sorters =
&["_geoPoint( 50.629973371633746 , 3.0569447399419567 ):desc".to_string()];
let mut document = value.clone();
insert_geo_distance(sorters, &mut document);
assert_eq!(document.get("_geoDistance"), Some(&json!(0)));
let sorters = &[
"prix:asc",
"villeneuve:desc",
"_geoPoint(50.629973371633746, 3.0569447399419567):asc",
"ubu:asc",
]
.map(|s| s.to_string());
let mut document = value.clone();
insert_geo_distance(sorters, &mut document);
assert_eq!(document.get("_geoDistance"), Some(&json!(0)));
// only the first geoPoint is used to compute the distance
let sorters = &[
"chien:desc",
"_geoPoint(50.629973371633746, 3.0569447399419567):asc",
"pangolin:desc",
"_geoPoint(100.0, -80.0):asc",
"chat:asc",
]
.map(|s| s.to_string());
let mut document = value.clone();
insert_geo_distance(sorters, &mut document);
assert_eq!(document.get("_geoDistance"), Some(&json!(0)));
// there was no _geoPoint so nothing is inserted in the document
let sorters = &["chien:asc".to_string()];
let mut document = value;
insert_geo_distance(sorters, &mut document);
assert_eq!(document.get("_geoDistance"), None);
}
}

View File

@ -0,0 +1,562 @@
use std::collections::{BTreeMap, BTreeSet};
use std::marker::PhantomData;
use std::num::NonZeroUsize;
use log::{debug, info, trace};
use milli::documents::DocumentsBatchReader;
use milli::update::{
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod,
Setting,
};
use serde::{Deserialize, Serialize, Serializer};
use uuid::Uuid;
use super::error::{IndexError, Result};
use super::index::{Index, IndexMeta};
use crate::update_file_store::UpdateFileStore;
fn serialize_with_wildcard<S>(
field: &Setting<Vec<String>>,
s: S,
) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
let wildcard = vec!["*".to_string()];
match field {
Setting::Set(value) => Some(value),
Setting::Reset => Some(&wildcard),
Setting::NotSet => None,
}
.serialize(s)
}
#[derive(Clone, Default, Debug, Serialize, PartialEq, Eq)]
pub struct Checked;
#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Unchecked;
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
pub struct MinWordSizeTyposSetting {
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
pub one_typo: Setting<u8>,
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
pub two_typos: Setting<u8>,
}
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
pub struct TypoSettings {
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
pub enabled: Setting<bool>,
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
pub min_word_size_for_typos: Setting<MinWordSizeTyposSetting>,
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
pub disable_on_words: Setting<BTreeSet<String>>,
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
pub disable_on_attributes: Setting<BTreeSet<String>>,
}
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
pub struct FacetingSettings {
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
pub max_values_per_facet: Setting<usize>,
}
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
pub struct PaginationSettings {
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
pub max_total_hits: Setting<usize>,
}
/// Holds all the settings for an index. `T` can either be `Checked` if they represents settings
/// whose validity is guaranteed, or `Unchecked` if they need to be validated. In the later case, a
/// call to `check` will return a `Settings<Checked>` from a `Settings<Unchecked>`.
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
#[serde(bound(serialize = "T: Serialize", deserialize = "T: Deserialize<'static>"))]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub struct Settings<T> {
#[serde(
default,
serialize_with = "serialize_with_wildcard",
skip_serializing_if = "Setting::is_not_set"
)]
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
pub displayed_attributes: Setting<Vec<String>>,
#[serde(
default,
serialize_with = "serialize_with_wildcard",
skip_serializing_if = "Setting::is_not_set"
)]
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
pub searchable_attributes: Setting<Vec<String>>,
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
pub filterable_attributes: Setting<BTreeSet<String>>,
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
pub sortable_attributes: Setting<BTreeSet<String>>,
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
pub ranking_rules: Setting<Vec<String>>,
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
pub stop_words: Setting<BTreeSet<String>>,
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
pub synonyms: Setting<BTreeMap<String, Vec<String>>>,
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
pub distinct_attribute: Setting<String>,
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
pub typo_tolerance: Setting<TypoSettings>,
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
pub faceting: Setting<FacetingSettings>,
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
#[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))]
pub pagination: Setting<PaginationSettings>,
#[serde(skip)]
pub _kind: PhantomData<T>,
}
impl Settings<Checked> {
pub fn cleared() -> Settings<Checked> {
Settings {
displayed_attributes: Setting::Reset,
searchable_attributes: Setting::Reset,
filterable_attributes: Setting::Reset,
sortable_attributes: Setting::Reset,
ranking_rules: Setting::Reset,
stop_words: Setting::Reset,
synonyms: Setting::Reset,
distinct_attribute: Setting::Reset,
typo_tolerance: Setting::Reset,
faceting: Setting::Reset,
pagination: Setting::Reset,
_kind: PhantomData,
}
}
pub fn into_unchecked(self) -> Settings<Unchecked> {
let Self {
displayed_attributes,
searchable_attributes,
filterable_attributes,
sortable_attributes,
ranking_rules,
stop_words,
synonyms,
distinct_attribute,
typo_tolerance,
faceting,
pagination,
..
} = self;
Settings {
displayed_attributes,
searchable_attributes,
filterable_attributes,
sortable_attributes,
ranking_rules,
stop_words,
synonyms,
distinct_attribute,
typo_tolerance,
faceting,
pagination,
_kind: PhantomData,
}
}
}
impl Settings<Unchecked> {
pub fn check(self) -> Settings<Checked> {
let displayed_attributes = match self.displayed_attributes {
Setting::Set(fields) => {
if fields.iter().any(|f| f == "*") {
Setting::Reset
} else {
Setting::Set(fields)
}
}
otherwise => otherwise,
};
let searchable_attributes = match self.searchable_attributes {
Setting::Set(fields) => {
if fields.iter().any(|f| f == "*") {
Setting::Reset
} else {
Setting::Set(fields)
}
}
otherwise => otherwise,
};
Settings {
displayed_attributes,
searchable_attributes,
filterable_attributes: self.filterable_attributes,
sortable_attributes: self.sortable_attributes,
ranking_rules: self.ranking_rules,
stop_words: self.stop_words,
synonyms: self.synonyms,
distinct_attribute: self.distinct_attribute,
typo_tolerance: self.typo_tolerance,
faceting: self.faceting,
pagination: self.pagination,
_kind: PhantomData,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
pub struct Facets {
pub level_group_size: Option<NonZeroUsize>,
pub min_level_size: Option<NonZeroUsize>,
}
impl Index {
fn update_primary_key_txn<'a, 'b>(
&'a self,
txn: &mut milli::heed::RwTxn<'a, 'b>,
primary_key: String,
) -> Result<IndexMeta> {
let mut builder = milli::update::Settings::new(txn, self, self.indexer_config.as_ref());
builder.set_primary_key(primary_key);
builder.execute(|_| ())?;
let meta = IndexMeta::new_txn(self, txn)?;
Ok(meta)
}
pub fn update_primary_key(&self, primary_key: String) -> Result<IndexMeta> {
let mut txn = self.write_txn()?;
let res = self.update_primary_key_txn(&mut txn, primary_key)?;
txn.commit()?;
Ok(res)
}
/// Deletes `ids` from the index, and returns how many documents were deleted.
pub fn delete_documents(&self, ids: &[String]) -> Result<DocumentDeletionResult> {
let mut txn = self.write_txn()?;
let mut builder = milli::update::DeleteDocuments::new(&mut txn, self)?;
// We ignore unexisting document ids
ids.iter().for_each(|id| {
builder.delete_external_id(id);
});
let deleted = builder.execute()?;
txn.commit()?;
Ok(deleted)
}
pub fn clear_documents(&self) -> Result<()> {
let mut txn = self.write_txn()?;
milli::update::ClearDocuments::new(&mut txn, self).execute()?;
txn.commit()?;
Ok(())
}
pub fn update_documents(
&self,
method: IndexDocumentsMethod,
primary_key: Option<String>,
file_store: UpdateFileStore,
contents: impl IntoIterator<Item = Uuid>,
) -> Result<Vec<Result<DocumentAdditionResult>>> {
trace!("performing document addition");
let mut txn = self.write_txn()?;
if let Some(primary_key) = primary_key {
if self.primary_key(&txn)?.is_none() {
self.update_primary_key_txn(&mut txn, primary_key)?;
}
}
let config = IndexDocumentsConfig {
update_method: method,
..Default::default()
};
let indexing_callback = |indexing_step| debug!("update: {:?}", indexing_step);
let mut builder = milli::update::IndexDocuments::new(
&mut txn,
self,
self.indexer_config.as_ref(),
config,
indexing_callback,
)?;
let mut results = Vec::new();
for content_uuid in contents.into_iter() {
let content_file = file_store.get_update(content_uuid)?;
let reader = DocumentsBatchReader::from_reader(content_file)?;
let (new_builder, user_result) = builder.add_documents(reader)?;
builder = new_builder;
let user_result = match user_result {
Ok(count) => {
let addition = DocumentAdditionResult {
indexed_documents: count,
number_of_documents: count,
};
info!("document addition done: {:?}", addition);
Ok(addition)
}
Err(e) => Err(IndexError::from(e)),
};
results.push(user_result);
}
if results.iter().any(Result::is_ok) {
let _addition = builder.execute()?;
txn.commit()?;
}
Ok(results)
}
pub fn update_settings(&self, settings: &Settings<Checked>) -> Result<()> {
// We must use the write transaction of the update here.
let mut txn = self.write_txn()?;
let mut builder =
milli::update::Settings::new(&mut txn, self, self.indexer_config.as_ref());
apply_settings_to_builder(settings, &mut builder);
builder.execute(|indexing_step| debug!("update: {:?}", indexing_step))?;
txn.commit()?;
Ok(())
}
}
pub fn apply_settings_to_builder(
settings: &Settings<Checked>,
builder: &mut milli::update::Settings,
) {
match settings.searchable_attributes {
Setting::Set(ref names) => builder.set_searchable_fields(names.clone()),
Setting::Reset => builder.reset_searchable_fields(),
Setting::NotSet => (),
}
match settings.displayed_attributes {
Setting::Set(ref names) => builder.set_displayed_fields(names.clone()),
Setting::Reset => builder.reset_displayed_fields(),
Setting::NotSet => (),
}
match settings.filterable_attributes {
Setting::Set(ref facets) => {
builder.set_filterable_fields(facets.clone().into_iter().collect())
}
Setting::Reset => builder.reset_filterable_fields(),
Setting::NotSet => (),
}
match settings.sortable_attributes {
Setting::Set(ref fields) => builder.set_sortable_fields(fields.iter().cloned().collect()),
Setting::Reset => builder.reset_sortable_fields(),
Setting::NotSet => (),
}
match settings.ranking_rules {
Setting::Set(ref criteria) => builder.set_criteria(criteria.clone()),
Setting::Reset => builder.reset_criteria(),
Setting::NotSet => (),
}
match settings.stop_words {
Setting::Set(ref stop_words) => builder.set_stop_words(stop_words.clone()),
Setting::Reset => builder.reset_stop_words(),
Setting::NotSet => (),
}
match settings.synonyms {
Setting::Set(ref synonyms) => builder.set_synonyms(synonyms.clone().into_iter().collect()),
Setting::Reset => builder.reset_synonyms(),
Setting::NotSet => (),
}
match settings.distinct_attribute {
Setting::Set(ref attr) => builder.set_distinct_field(attr.clone()),
Setting::Reset => builder.reset_distinct_field(),
Setting::NotSet => (),
}
match settings.typo_tolerance {
Setting::Set(ref value) => {
match value.enabled {
Setting::Set(val) => builder.set_autorize_typos(val),
Setting::Reset => builder.reset_authorize_typos(),
Setting::NotSet => (),
}
match value.min_word_size_for_typos {
Setting::Set(ref setting) => {
match setting.one_typo {
Setting::Set(val) => builder.set_min_word_len_one_typo(val),
Setting::Reset => builder.reset_min_word_len_one_typo(),
Setting::NotSet => (),
}
match setting.two_typos {
Setting::Set(val) => builder.set_min_word_len_two_typos(val),
Setting::Reset => builder.reset_min_word_len_two_typos(),
Setting::NotSet => (),
}
}
Setting::Reset => {
builder.reset_min_word_len_one_typo();
builder.reset_min_word_len_two_typos();
}
Setting::NotSet => (),
}
match value.disable_on_words {
Setting::Set(ref words) => {
builder.set_exact_words(words.clone());
}
Setting::Reset => builder.reset_exact_words(),
Setting::NotSet => (),
}
match value.disable_on_attributes {
Setting::Set(ref words) => {
builder.set_exact_attributes(words.iter().cloned().collect())
}
Setting::Reset => builder.reset_exact_attributes(),
Setting::NotSet => (),
}
}
Setting::Reset => {
// all typo settings need to be reset here.
builder.reset_authorize_typos();
builder.reset_min_word_len_one_typo();
builder.reset_min_word_len_two_typos();
builder.reset_exact_words();
builder.reset_exact_attributes();
}
Setting::NotSet => (),
}
match settings.faceting {
Setting::Set(ref value) => match value.max_values_per_facet {
Setting::Set(val) => builder.set_max_values_per_facet(val),
Setting::Reset => builder.reset_max_values_per_facet(),
Setting::NotSet => (),
},
Setting::Reset => builder.reset_max_values_per_facet(),
Setting::NotSet => (),
}
match settings.pagination {
Setting::Set(ref value) => match value.max_total_hits {
Setting::Set(val) => builder.set_pagination_max_total_hits(val),
Setting::Reset => builder.reset_pagination_max_total_hits(),
Setting::NotSet => (),
},
Setting::Reset => builder.reset_pagination_max_total_hits(),
Setting::NotSet => (),
}
}
#[cfg(test)]
pub(crate) mod test {
use proptest::prelude::*;
use super::*;
pub(super) fn setting_strategy<T: Arbitrary + Clone>() -> impl Strategy<Value = Setting<T>> {
prop_oneof![
Just(Setting::NotSet),
Just(Setting::Reset),
any::<T>().prop_map(Setting::Set)
]
}
#[test]
fn test_setting_check() {
// test no changes
let settings = Settings {
displayed_attributes: Setting::Set(vec![String::from("hello")]),
searchable_attributes: Setting::Set(vec![String::from("hello")]),
filterable_attributes: Setting::NotSet,
sortable_attributes: Setting::NotSet,
ranking_rules: Setting::NotSet,
stop_words: Setting::NotSet,
synonyms: Setting::NotSet,
distinct_attribute: Setting::NotSet,
typo_tolerance: Setting::NotSet,
faceting: Setting::NotSet,
pagination: Setting::NotSet,
_kind: PhantomData::<Unchecked>,
};
let checked = settings.clone().check();
assert_eq!(settings.displayed_attributes, checked.displayed_attributes);
assert_eq!(
settings.searchable_attributes,
checked.searchable_attributes
);
// test wildcard
// test no changes
let settings = Settings {
displayed_attributes: Setting::Set(vec![String::from("*")]),
searchable_attributes: Setting::Set(vec![String::from("hello"), String::from("*")]),
filterable_attributes: Setting::NotSet,
sortable_attributes: Setting::NotSet,
ranking_rules: Setting::NotSet,
stop_words: Setting::NotSet,
synonyms: Setting::NotSet,
distinct_attribute: Setting::NotSet,
typo_tolerance: Setting::NotSet,
faceting: Setting::NotSet,
pagination: Setting::NotSet,
_kind: PhantomData::<Unchecked>,
};
let checked = settings.check();
assert_eq!(checked.displayed_attributes, Setting::Reset);
assert_eq!(checked.searchable_attributes, Setting::Reset);
}
}

View File

@ -1,20 +1,24 @@
mod batch;
pub mod error;
pub mod index;
pub mod task;
mod update_file_store;
mod utils;
use batch::Batch;
pub use error::Error;
use index::Index;
use milli::heed::types::{DecodeIgnore, OwnedType, SerdeBincode, Str};
pub use task::Task;
use task::{Kind, Status};
use task::{Kind, KindWithContent, Status};
use std::collections::hash_map::Entry;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{collections::HashMap, sync::RwLock};
use milli::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn};
use milli::{Index, RoaringBitmapCodec, BEU32};
use milli::{RoaringBitmapCodec, BEU32};
use roaring::RoaringBitmap;
use serde::Deserialize;
@ -44,7 +48,7 @@ pub struct Query {
pub struct IndexScheduler {
// Keep track of the opened indexes and is used
// mainly by the index resolver.
index_map: Arc<RwLock<HashMap<IndexUuid, Index>>>,
index_map: Arc<RwLock<HashMap<String, Index>>>,
/// The list of tasks currently processing.
processing_tasks: Arc<RwLock<RoaringBitmap>>,
@ -60,8 +64,8 @@ pub struct IndexScheduler {
// All the tasks ids grouped by their kind.
kind: Database<SerdeBincode<Kind>, RoaringBitmapCodec>,
// Map an index name with an indexuuid.
index_name_mapper: Database<Str, Str>,
// Tell you if an index is currently available.
available_index: Database<Str, SerdeBincode<bool>>,
// Store the tasks associated to an index.
index_tasks: Database<Str, RoaringBitmapCodec>,
@ -75,12 +79,13 @@ impl IndexScheduler {
/// `IndexNotFound` error.
pub fn index(&self, name: &str) -> Result<Index> {
let rtxn = self.env.read_txn()?;
let uuid = self
.index_name_mapper
self.available_index
.get(&rtxn, name)?
.ok_or(Error::IndexNotFound)?;
.ok_or(Error::IndexNotFound(name.to_string()))?;
// we clone here to drop the lock before entering the match
let index = self.index_map.read().unwrap().get(&*uuid).cloned();
let index = self.index_map.read().unwrap().get(name).cloned();
let index = match index {
Some(index) => index,
// since we're lazy, it's possible that the index doesn't exist yet.
@ -93,10 +98,15 @@ impl IndexScheduler {
// if it's not already there.
// Since there is a good chance it's not already there we can use
// the entry method.
match index_map.entry(uuid.to_string()) {
match index_map.entry(name.to_string()) {
Entry::Vacant(entry) => {
// TODO: TAMO: get the envopenoptions from somewhere
let index = milli::Index::new(EnvOpenOptions::new(), uuid)?;
// TODO: TAMO: get the args from somewhere.
let index = Index::open(
name.to_string(),
name.to_string(),
100_000_000,
Arc::default(),
)?;
entry.insert(index.clone());
index
}
@ -191,6 +201,96 @@ impl IndexScheduler {
Ok(())
}
/// This worker function must be run in a different thread and must be run only once.
fn run(&self) {
loop {
// TODO: TAMO: remove this horrible spinlock in favor of a sleep / channel / well see
while !self.wake_up.swap(false, Ordering::Relaxed) {}
let mut wtxn = match self.env.write_txn() {
Ok(wtxn) => wtxn,
Err(e) => {
log::error!("{}", e);
continue;
}
};
let batch = match self.get_next_batch(&wtxn) {
Ok(batch) => batch,
Err(e) => {
log::error!("{}", e);
continue;
}
};
let res = self.process_batch(&mut wtxn, &mut batch);
// TODO: TAMO: do this later
// self.handle_batch_result(res);
}
}
fn process_batch(&self, wtxn: &mut RwTxn, batch: &mut Batch) -> Result<()> {
match batch {
Batch::One(task) => match task.kind {
KindWithContent::ClearAllDocuments { index_name } => {
self.index(&index_name)?.clear_documents()?;
}
KindWithContent::RenameIndex {
index_name,
new_name,
} => {
if self.available_index.get(wtxn, &new_name)?.unwrap_or(false) {
return Err(Error::IndexAlreadyExists);
}
todo!("wait for @guigui insight");
}
KindWithContent::CreateIndex {
index_name,
primary_key,
} => {
if self
.available_index
.get(wtxn, &index_name)?
.unwrap_or(false)
{
return Err(Error::IndexAlreadyExists(index_name.to_string()));
}
self.available_index.put(wtxn, &index_name, &true);
// let index =
todo!("tamo: once I get index.rs to works");
}
KindWithContent::DeleteIndex { index_name } => {
self.index_map.write();
if !self.available_index.delete(wtxn, &index_name)? {
return Err(Error::IndexNotFound(index_name.to_string()));
}
todo!("tamo: once I get index.rs to works");
}
KindWithContent::SwapIndex { lhs, rhs } => {
if !self.available_index.get(wtxn, &lhs)?.unwrap_or(false) {
return Err(Error::IndexNotFound(lhs.to_string()));
}
if !self.available_index.get(wtxn, &rhs)?.unwrap_or(false) {
return Err(Error::IndexNotFound(rhs.to_string()));
}
let index_map = self.index_map.write()?;
// index_map.remove.
}
_ => unreachable!(),
},
Batch::Cancel(_) => todo!(),
Batch::Snapshot(_) => todo!(),
Batch::Dump(_) => todo!(),
Batch::Contiguous { tasks, kind } => todo!(),
Batch::Empty => todo!(),
}
Ok(())
}
/// Notify the scheduler there is or may be work to do.
pub fn notify(&self) {
self.wake_up

View File

@ -5,7 +5,7 @@ use time::OffsetDateTime;
use crate::TaskId;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Status {
Enqueued,
@ -17,6 +17,8 @@ pub enum Status {
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Task {
pub uid: TaskId,
#[serde(with = "time::serde::rfc3339::option")]
pub enqueued_at: Option<OffsetDateTime>,
#[serde(with = "time::serde::rfc3339::option")]
@ -45,7 +47,7 @@ impl Task {
}
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum KindWithContent {
DumpExport {
@ -173,7 +175,7 @@ impl KindWithContent {
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Kind {
CancelTask,

View File

@ -0,0 +1,258 @@
use std::fs::{create_dir_all, File};
use std::io::{self, BufReader, BufWriter, Write};
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use milli::documents::DocumentsBatchReader;
use serde_json::Map;
use tempfile::{NamedTempFile, PersistError};
use uuid::Uuid;
#[cfg(not(test))]
pub use store::UpdateFileStore;
#[cfg(test)]
pub use test::MockUpdateFileStore as UpdateFileStore;
const UPDATE_FILES_PATH: &str = "updates/updates_files";
use crate::document_formats::read_ndjson;
pub struct UpdateFile {
path: PathBuf,
file: NamedTempFile,
}
#[derive(Debug, thiserror::Error)]
#[error("Error while persisting update to disk: {0}")]
pub struct UpdateFileStoreError(Box<dyn std::error::Error + Sync + Send + 'static>);
pub type Result<T> = std::result::Result<T, UpdateFileStoreError>;
macro_rules! into_update_store_error {
($($other:path),*) => {
$(
impl From<$other> for UpdateFileStoreError {
fn from(other: $other) -> Self {
Self(Box::new(other))
}
}
)*
};
}
into_update_store_error!(
PersistError,
io::Error,
serde_json::Error,
milli::documents::Error,
milli::documents::DocumentsBatchCursorError
);
impl UpdateFile {
pub fn persist(self) -> Result<()> {
self.file.persist(&self.path)?;
Ok(())
}
}
impl Deref for UpdateFile {
type Target = NamedTempFile;
fn deref(&self) -> &Self::Target {
&self.file
}
}
impl DerefMut for UpdateFile {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.file
}
}
mod store {
use super::*;
#[derive(Clone, Debug)]
pub struct UpdateFileStore {
path: PathBuf,
}
impl UpdateFileStore {
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
let src_update_files_path = src.as_ref().join(UPDATE_FILES_PATH);
let dst_update_files_path = dst.as_ref().join(UPDATE_FILES_PATH);
// No update files to load
if !src_update_files_path.exists() {
return Ok(());
}
create_dir_all(&dst_update_files_path)?;
let entries = std::fs::read_dir(src_update_files_path)?;
for entry in entries {
let entry = entry?;
let update_file = BufReader::new(File::open(entry.path())?);
let file_uuid = entry.file_name();
let file_uuid = file_uuid
.to_str()
.ok_or_else(|| anyhow::anyhow!("invalid update file name"))?;
let dst_path = dst_update_files_path.join(file_uuid);
let dst_file = BufWriter::new(File::create(dst_path)?);
read_ndjson(update_file, dst_file)?;
}
Ok(())
}
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().join(UPDATE_FILES_PATH);
std::fs::create_dir_all(&path)?;
Ok(Self { path })
}
/// Creates a new temporary update file.
/// A call to `persist` is needed to persist the file in the database.
pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> {
let file = NamedTempFile::new_in(&self.path)?;
let uuid = Uuid::new_v4();
let path = self.path.join(uuid.to_string());
let update_file = UpdateFile { file, path };
Ok((uuid, update_file))
}
/// Returns the file corresponding to the requested uuid.
pub fn get_update(&self, uuid: Uuid) -> Result<File> {
let path = self.path.join(uuid.to_string());
let file = File::open(path)?;
Ok(file)
}
/// Copies the content of the update file pointed to by `uuid` to the `dst` directory.
pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef<Path>) -> Result<()> {
let src = self.path.join(uuid.to_string());
let mut dst = dst.as_ref().join(UPDATE_FILES_PATH);
std::fs::create_dir_all(&dst)?;
dst.push(uuid.to_string());
std::fs::copy(src, dst)?;
Ok(())
}
/// Peforms a dump of the given update file uuid into the provided dump path.
pub fn dump(&self, uuid: Uuid, dump_path: impl AsRef<Path>) -> Result<()> {
let uuid_string = uuid.to_string();
let update_file_path = self.path.join(&uuid_string);
let mut dst = dump_path.as_ref().join(UPDATE_FILES_PATH);
std::fs::create_dir_all(&dst)?;
dst.push(&uuid_string);
let update_file = File::open(update_file_path)?;
let mut dst_file = NamedTempFile::new_in(&dump_path)?;
let (mut document_cursor, index) =
DocumentsBatchReader::from_reader(update_file)?.into_cursor_and_fields_index();
let mut document_buffer = Map::new();
// TODO: we need to find a way to do this more efficiently. (create a custom serializer
// for jsonl for example...)
while let Some(document) = document_cursor.next_document()? {
for (field_id, content) in document.iter() {
if let Some(field_name) = index.name(field_id) {
let content = serde_json::from_slice(content)?;
document_buffer.insert(field_name.to_string(), content);
}
}
serde_json::to_writer(&mut dst_file, &document_buffer)?;
dst_file.write_all(b"\n")?;
document_buffer.clear();
}
dst_file.persist(dst)?;
Ok(())
}
pub fn get_size(&self, uuid: Uuid) -> Result<u64> {
Ok(self.get_update(uuid)?.metadata()?.len())
}
pub async fn delete(&self, uuid: Uuid) -> Result<()> {
let path = self.path.join(uuid.to_string());
tokio::fs::remove_file(path).await?;
Ok(())
}
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use nelson::Mocker;
use super::*;
#[derive(Clone)]
pub enum MockUpdateFileStore {
Real(store::UpdateFileStore),
Mock(Arc<Mocker>),
}
impl MockUpdateFileStore {
pub fn mock(mocker: Mocker) -> Self {
Self::Mock(Arc::new(mocker))
}
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
store::UpdateFileStore::load_dump(src, dst)
}
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
store::UpdateFileStore::new(path).map(Self::Real)
}
pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> {
match self {
MockUpdateFileStore::Real(s) => s.new_update(),
MockUpdateFileStore::Mock(_) => todo!(),
}
}
pub fn get_update(&self, uuid: Uuid) -> Result<File> {
match self {
MockUpdateFileStore::Real(s) => s.get_update(uuid),
MockUpdateFileStore::Mock(_) => todo!(),
}
}
pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef<Path>) -> Result<()> {
match self {
MockUpdateFileStore::Real(s) => s.snapshot(uuid, dst),
MockUpdateFileStore::Mock(_) => todo!(),
}
}
pub fn dump(&self, uuid: Uuid, dump_path: impl AsRef<Path>) -> Result<()> {
match self {
MockUpdateFileStore::Real(s) => s.dump(uuid, dump_path),
MockUpdateFileStore::Mock(_) => todo!(),
}
}
pub fn get_size(&self, uuid: Uuid) -> Result<u64> {
match self {
MockUpdateFileStore::Real(s) => s.get_size(uuid),
MockUpdateFileStore::Mock(_) => todo!(),
}
}
pub async fn delete(&self, uuid: Uuid) -> Result<()> {
match self {
MockUpdateFileStore::Real(s) => s.delete(uuid).await,
MockUpdateFileStore::Mock(mocker) => unsafe { mocker.get("delete").call(uuid) },
}
}
}
}

View File

@ -44,6 +44,36 @@ impl IndexScheduler {
.collect::<Result<_>>()
}
pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: Task) -> Result<()> {
let old_task = self
.get_task(wtxn, task.uid)?
.ok_or(Error::CorruptedTaskQueue)?;
if old_task.status != task.status {
self.update_status(wtxn, old_task.status, |bitmap| {
bitmap.remove(task.uid);
bitmap
})?;
self.update_status(wtxn, task.status, |bitmap| {
bitmap.insert(task.uid);
bitmap
})?;
}
if old_task.kind.as_kind() != task.kind.as_kind() {
self.update_kind(wtxn, old_task.kind.as_kind(), |bitmap| {
bitmap.remove(task.uid);
bitmap
})?;
self.update_kind(wtxn, task.kind.as_kind(), |bitmap| {
bitmap.insert(task.uid);
bitmap
})?;
}
Ok(())
}
pub(crate) fn get_index(&self, rtxn: &RoTxn, index: &str) -> Result<RoaringBitmap> {
Ok(self.index_tasks.get(&rtxn, index)?.unwrap_or_default())
}