use std::collections::{BTreeSet, HashMap}; use std::io; use std::num::NonZeroUsize; use flate2::read::GzDecoder; use log::info; use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; use serde::{de::Deserializer, Deserialize, Serialize}; use super::Index; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum UpdateResult { DocumentsAddition(DocumentAdditionResult), DocumentDeletion { deleted: u64 }, Other, } #[derive(Debug, Clone, Default, Serialize, Deserialize)] #[serde(deny_unknown_fields)] #[serde(rename_all = "camelCase")] pub struct Settings { #[serde( default, deserialize_with = "deserialize_some", skip_serializing_if = "Option::is_none" )] pub displayed_attributes: Option>>, #[serde( default, deserialize_with = "deserialize_some", skip_serializing_if = "Option::is_none" )] pub searchable_attributes: Option>>, #[serde(default)] pub attributes_for_faceting: Option>>, #[serde( default, deserialize_with = "deserialize_some", skip_serializing_if = "Option::is_none" )] pub ranking_rules: Option>>, #[serde( default, deserialize_with = "deserialize_some", skip_serializing_if = "Option::is_none" )] pub stop_words: Option>>, #[serde( default, deserialize_with = "deserialize_some", skip_serializing_if = "Option::is_none" )] pub distinct_attribute: Option>, } impl Settings { pub fn cleared() -> Self { Self { displayed_attributes: Some(None), searchable_attributes: Some(None), attributes_for_faceting: Some(None), ranking_rules: Some(None), stop_words: Some(None), distinct_attribute: Some(None), } } } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] #[serde(rename_all = "camelCase")] pub struct Facets { pub level_group_size: Option, pub min_level_size: Option, } fn deserialize_some<'de, T, D>(deserializer: D) -> Result, D::Error> where T: Deserialize<'de>, D: Deserializer<'de>, { Deserialize::deserialize(deserializer).map(Some) } impl Index { pub fn update_documents( &self, format: UpdateFormat, method: IndexDocumentsMethod, content: impl io::Read, update_builder: UpdateBuilder, primary_key: Option<&str>, ) -> anyhow::Result { info!("performing document addition"); // We must use the write transaction of the update here. let mut wtxn = self.write_txn()?; // Set the primary key if not set already, ignore if already set. if let (None, Some(ref primary_key)) = (self.primary_key(&wtxn)?, primary_key) { self.put_primary_key(&mut wtxn, primary_key)?; } let mut builder = update_builder.index_documents(&mut wtxn, self); builder.update_format(format); builder.index_documents_method(method); let gzipped = false; let reader = if gzipped { Box::new(GzDecoder::new(content)) } else { Box::new(content) as Box }; let result = builder.execute(reader, |indexing_step, update_id| { info!("update {}: {:?}", update_id, indexing_step) }); info!("document addition done: {:?}", result); result.and_then(|addition_result| { wtxn.commit() .and(Ok(UpdateResult::DocumentsAddition(addition_result))) .map_err(Into::into) }) } pub fn clear_documents(&self, update_builder: UpdateBuilder) -> anyhow::Result { // We must use the write transaction of the update here. let mut wtxn = self.write_txn()?; let builder = update_builder.clear_documents(&mut wtxn, self); match builder.execute() { Ok(_count) => wtxn .commit() .and(Ok(UpdateResult::Other)) .map_err(Into::into), Err(e) => Err(e), } } pub fn update_settings( &self, settings: &Settings, update_builder: UpdateBuilder, ) -> anyhow::Result { // We must use the write transaction of the update here. let mut wtxn = self.write_txn()?; let mut builder = update_builder.settings(&mut wtxn, self); if let Some(ref names) = settings.searchable_attributes { match names { Some(names) => builder.set_searchable_fields(names.clone()), None => builder.reset_searchable_fields(), } } if let Some(ref names) = settings.displayed_attributes { match names { Some(names) => builder.set_displayed_fields(names.clone()), None => builder.reset_displayed_fields(), } } if let Some(ref facet_types) = settings.attributes_for_faceting { let facet_types = facet_types.clone().unwrap_or_else(HashMap::new); builder.set_faceted_fields(facet_types); } if let Some(ref criteria) = settings.ranking_rules { match criteria { Some(criteria) => builder.set_criteria(criteria.clone()), None => builder.reset_criteria(), } } if let Some(ref stop_words) = settings.stop_words { match stop_words { Some(stop_words) => builder.set_stop_words(stop_words.clone()), _ => builder.reset_stop_words(), } } if let Some(ref distinct_attribute) = settings.distinct_attribute { match distinct_attribute { Some(attr) => builder.set_distinct_attribute(attr.clone()), None => builder.reset_distinct_attribute(), } } let result = builder .execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); match result { Ok(()) => wtxn .commit() .and(Ok(UpdateResult::Other)) .map_err(Into::into), Err(e) => Err(e), } } pub fn update_facets( &self, levels: &Facets, update_builder: UpdateBuilder, ) -> anyhow::Result { // We must use the write transaction of the update here. let mut wtxn = self.write_txn()?; let mut builder = update_builder.facets(&mut wtxn, self); if let Some(value) = levels.level_group_size { builder.level_group_size(value); } if let Some(value) = levels.min_level_size { builder.min_level_size(value); } match builder.execute() { Ok(()) => wtxn .commit() .and(Ok(UpdateResult::Other)) .map_err(Into::into), Err(e) => Err(e), } } pub fn delete_documents( &self, document_ids: impl io::Read, update_builder: UpdateBuilder, ) -> anyhow::Result { let ids: Vec = serde_json::from_reader(document_ids)?; let mut txn = self.write_txn()?; let mut builder = update_builder.delete_documents(&mut txn, self)?; // We ignore unexisting document ids ids.iter().for_each(|id| { builder.delete_external_id(id); }); match builder.execute() { Ok(deleted) => txn .commit() .and(Ok(UpdateResult::DocumentDeletion { deleted })) .map_err(Into::into), Err(e) => Err(e), } } }