use std::collections::{BTreeSet, HashSet}; use std::fs::create_dir_all; use std::marker::PhantomData; use std::ops::Deref; use std::path::Path; use std::sync::Arc; use fst::IntoStreamer; use milli::heed::{EnvOpenOptions, RoTxn}; use milli::update::{IndexerConfig, Setting}; use milli::{obkv_to_json, FieldDistribution, FieldId}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use time::OffsetDateTime; use uuid::Uuid; use crate::EnvSizer; use super::error::IndexError; use super::error::Result; use super::updates::{MinWordSizeTyposSetting, TypoSettings}; use super::{Checked, Settings}; pub type Document = Map; #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct IndexMeta { #[serde(with = "time::serde::rfc3339")] pub created_at: OffsetDateTime, #[serde(with = "time::serde::rfc3339")] pub updated_at: OffsetDateTime, pub primary_key: Option, } impl IndexMeta { pub fn new(index: &Index) -> Result { let txn = index.read_txn()?; Self::new_txn(index, &txn) } pub fn new_txn(index: &Index, txn: &milli::heed::RoTxn) -> Result { let created_at = index.created_at(txn)?; let updated_at = index.updated_at(txn)?; let primary_key = index.primary_key(txn)?.map(String::from); Ok(Self { created_at, updated_at, primary_key, }) } } #[derive(Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct IndexStats { #[serde(skip)] pub size: u64, pub number_of_documents: u64, /// Whether the current index is performing an update. It is initially `None` when the /// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is /// later set to either true or false, we we retrieve the information from the `UpdateStore` pub is_indexing: Option, pub field_distribution: FieldDistribution, } #[derive(Clone, derivative::Derivative)] #[derivative(Debug)] pub struct Index { pub uuid: Uuid, #[derivative(Debug = "ignore")] pub inner: Arc, #[derivative(Debug = "ignore")] pub indexer_config: Arc, } impl Deref for Index { type Target = milli::Index; fn deref(&self) -> &Self::Target { self.inner.as_ref() } } impl Index { pub fn open( path: impl AsRef, size: usize, uuid: Uuid, update_handler: Arc, ) -> Result { log::debug!("opening index in {}", path.as_ref().display()); create_dir_all(&path)?; let mut options = EnvOpenOptions::new(); options.map_size(size); let inner = Arc::new(milli::Index::new(options, &path)?); Ok(Index { inner, uuid, indexer_config: update_handler, }) } /// Asynchronously close the underlying index pub fn close(self) { self.inner.as_ref().clone().prepare_for_closing(); } pub fn stats(&self) -> Result { let rtxn = self.read_txn()?; Ok(IndexStats { size: self.size(), number_of_documents: self.number_of_documents(&rtxn)?, is_indexing: None, field_distribution: self.field_distribution(&rtxn)?, }) } pub fn meta(&self) -> Result { IndexMeta::new(self) } pub fn settings(&self) -> Result> { let txn = self.read_txn()?; self.settings_txn(&txn) } pub fn uuid(&self) -> Uuid { self.uuid } pub fn settings_txn(&self, txn: &RoTxn) -> Result> { let displayed_attributes = self .displayed_fields(txn)? .map(|fields| fields.into_iter().map(String::from).collect()); let searchable_attributes = self .searchable_fields(txn)? .map(|fields| fields.into_iter().map(String::from).collect()); let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect(); let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect(); let criteria = self .criteria(txn)? .into_iter() .map(|c| c.to_string()) .collect(); let stop_words = self .stop_words(txn)? .map(|stop_words| -> Result> { Ok(stop_words.stream().into_strs()?.into_iter().collect()) }) .transpose()? .unwrap_or_default(); let distinct_field = self.distinct_field(txn)?.map(String::from); // in milli each word in the synonyms map were split on their separator. Since we lost // this information we are going to put space between words. let synonyms = self .synonyms(txn)? .iter() .map(|(key, values)| { ( key.join(" "), values.iter().map(|value| value.join(" ")).collect(), ) }) .collect(); let min_typo_word_len = MinWordSizeTyposSetting { one_typo: Setting::Set(self.min_word_len_one_typo(txn)?), two_typos: Setting::Set(self.min_word_len_two_typos(txn)?), }; let disabled_words = self .exact_words(txn)? .into_stream() .into_strs()? .into_iter() .collect(); let disabled_attributes = self .exact_attributes(txn)? .into_iter() .map(String::from) .collect(); let typo_tolerance = TypoSettings { enabled: Setting::Set(self.authorize_typos(txn)?), min_word_size_for_typos: Setting::Set(min_typo_word_len), disable_on_words: Setting::Set(disabled_words), disable_on_attributes: Setting::Set(disabled_attributes), }; Ok(Settings { displayed_attributes: match displayed_attributes { Some(attrs) => Setting::Set(attrs), None => Setting::Reset, }, searchable_attributes: match searchable_attributes { Some(attrs) => Setting::Set(attrs), None => Setting::Reset, }, filterable_attributes: Setting::Set(filterable_attributes), sortable_attributes: Setting::Set(sortable_attributes), ranking_rules: Setting::Set(criteria), stop_words: Setting::Set(stop_words), distinct_attribute: match distinct_field { Some(field) => Setting::Set(field), None => Setting::Reset, }, synonyms: Setting::Set(synonyms), typo_tolerance: Setting::Set(typo_tolerance), _kind: PhantomData, }) } pub fn retrieve_documents>( &self, offset: usize, limit: usize, attributes_to_retrieve: Option>, ) -> Result>> { let txn = self.read_txn()?; let fields_ids_map = self.fields_ids_map(&txn)?; let fields_to_display = self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?; let iter = self.documents.range(&txn, &(..))?.skip(offset).take(limit); let mut documents = Vec::new(); for entry in iter { let (_id, obkv) = entry?; let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv)?; documents.push(object); } Ok(documents) } pub fn retrieve_document>( &self, doc_id: String, attributes_to_retrieve: Option>, ) -> Result> { let txn = self.read_txn()?; let fields_ids_map = self.fields_ids_map(&txn)?; let fields_to_display = self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?; let internal_id = self .external_documents_ids(&txn)? .get(doc_id.as_bytes()) .ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?; let document = self .documents(&txn, std::iter::once(internal_id))? .into_iter() .next() .map(|(_, d)| d) .ok_or(IndexError::DocumentNotFound(doc_id))?; let document = obkv_to_json(&fields_to_display, &fields_ids_map, document)?; Ok(document) } pub fn size(&self) -> u64 { self.env.size() } fn fields_to_display>( &self, txn: &milli::heed::RoTxn, attributes_to_retrieve: &Option>, fields_ids_map: &milli::FieldsIdsMap, ) -> Result> { let mut displayed_fields_ids = match self.displayed_fields_ids(txn)? { Some(ids) => ids.into_iter().collect::>(), None => fields_ids_map.iter().map(|(id, _)| id).collect(), }; let attributes_to_retrieve_ids = match attributes_to_retrieve { Some(attrs) => attrs .iter() .filter_map(|f| fields_ids_map.id(f.as_ref())) .collect::>(), None => fields_ids_map.iter().map(|(id, _)| id).collect(), }; displayed_fields_ids.retain(|fid| attributes_to_retrieve_ids.contains(fid)); Ok(displayed_fields_ids) } pub fn snapshot(&self, path: impl AsRef) -> Result<()> { let mut dst = path.as_ref().join(format!("indexes/{}/", self.uuid)); create_dir_all(&dst)?; dst.push("data.mdb"); let _txn = self.write_txn()?; self.inner .env .copy_to_path(dst, milli::heed::CompactionOption::Enabled)?; Ok(()) } } /// When running tests, when a server instance is dropped, the environment is not actually closed, /// leaving a lot of open file descriptors. impl Drop for Index { fn drop(&mut self) { // When dropping the last instance of an index, we want to close the index // Note that the close is actually performed only if all the instances a effectively // dropped if Arc::strong_count(&self.inner) == 1 { self.inner.as_ref().clone().prepare_for_closing(); } } }