diff --git a/benchmarks/benches/indexing.rs b/benchmarks/benches/indexing.rs index ee74f2a80..2d0604750 100644 --- a/benchmarks/benches/indexing.rs +++ b/benchmarks/benches/indexing.rs @@ -70,7 +70,8 @@ fn indexing_songs_default(c: &mut Criterion) { let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); builder.add_documents(documents).unwrap(); @@ -120,7 +121,8 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_SONGS_1_2, "csv"); builder.add_documents(documents).unwrap(); builder.execute().unwrap(); @@ -134,14 +136,16 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_SONGS_3_4, "csv"); builder.add_documents(documents).unwrap(); builder.execute().unwrap(); let indexing_config = IndexDocumentsConfig::default(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_SONGS_4_4, "csv"); builder.add_documents(documents).unwrap(); builder.execute().unwrap(); @@ -190,7 +194,8 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) { let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); @@ -236,7 +241,8 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) { let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); builder.add_documents(documents).unwrap(); @@ -281,7 +287,8 @@ fn indexing_wiki(c: &mut Criterion) { IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut wtxn = index.write_txn().unwrap(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); builder.add_documents(documents).unwrap(); @@ -323,7 +330,8 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES_1_2, "csv"); builder.add_documents(documents).unwrap(); @@ -339,7 +347,8 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut wtxn = index.write_txn().unwrap(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES_3_4, "csv"); @@ -349,7 +358,8 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES_4_4, "csv"); @@ -400,7 +410,8 @@ fn indexing_movies_default(c: &mut Criterion) { let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) + .unwrap(); let documents = utils::documents_from(datasets_paths::MOVIES, "json"); builder.add_documents(documents).unwrap(); @@ -447,7 +458,8 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) + .unwrap(); let documents = utils::documents_from(datasets_paths::MOVIES_1_2, "json"); builder.add_documents(documents).unwrap(); @@ -462,7 +474,8 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) { let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) + .unwrap(); let documents = utils::documents_from(datasets_paths::MOVIES_3_4, "json"); builder.add_documents(documents).unwrap(); @@ -470,7 +483,8 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) { let indexing_config = IndexDocumentsConfig::default(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) + .unwrap(); let documents = utils::documents_from(datasets_paths::MOVIES_4_4, "json"); builder.add_documents(documents).unwrap(); @@ -525,7 +539,8 @@ fn indexing_geo(c: &mut Criterion) { let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_ALL_COUNTRIES, "jsonl"); builder.add_documents(documents).unwrap(); diff --git a/benchmarks/benches/utils.rs b/benchmarks/benches/utils.rs index 383587ef8..b769bf2c7 100644 --- a/benchmarks/benches/utils.rs +++ b/benchmarks/benches/utils.rs @@ -96,7 +96,8 @@ pub fn base_setup(conf: &Conf) -> Index { update_method: IndexDocumentsMethod::ReplaceDocuments, ..Default::default() }; - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); let documents = documents_from(conf.dataset, conf.dataset_format); builder.add_documents(documents).unwrap(); diff --git a/cli/src/main.rs b/cli/src/main.rs index 542b9d472..3e9e8c75f 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -261,7 +261,8 @@ impl Performer for DocumentAddition { &config, indexing_config, |step| indexing_callback(step, &bars), - ); + ) + .unwrap(); addition.add_documents(reader)?; std::thread::spawn(move || { diff --git a/http-ui/src/main.rs b/http-ui/src/main.rs index 26c1034eb..7a3ed8ebe 100644 --- a/http-ui/src/main.rs +++ b/http-ui/src/main.rs @@ -410,7 +410,7 @@ async fn main() -> anyhow::Result<()> { GLOBAL_CONFIG.get().unwrap(), indexing_config, indexing_callback, - ); + )?; let reader = match encoding.as_deref() { Some("gzip") => Box::new(GzDecoder::new(content)), diff --git a/milli/Cargo.toml b/milli/Cargo.toml index 07c509438..e8723dc6a 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -14,6 +14,7 @@ crossbeam-channel = "0.5.2" either = "1.6.1" fst = "0.4.7" fxhash = "0.2.1" +flatten-serde-json = "0.1.0" grenad = { version = "0.4.1", default-features = false, features = ["tempfile"] } geoutils = "0.4.1" heed = { git = "https://github.com/meilisearch/heed", tag = "v0.12.1", default-features = false, features = ["lmdb", "sync-read-txn"] } diff --git a/milli/src/documents/mod.rs b/milli/src/documents/mod.rs index 8fd018328..09f15901d 100644 --- a/milli/src/documents/mod.rs +++ b/milli/src/documents/mod.rs @@ -49,6 +49,24 @@ impl DocumentsBatchIndex { pub fn name(&self, id: FieldId) -> Option<&String> { self.0.get_by_left(&id) } + + pub fn recreate_json( + &self, + document: &obkv::KvReaderU16, + ) -> Result, crate::Error> { + let mut map = serde_json::Map::new(); + + for (k, v) in document.iter() { + // TODO: TAMO: update the error type + let key = + self.0.get_by_left(&k).ok_or(crate::error::InternalError::DatabaseClosing)?.clone(); + let value = serde_json::from_slice::(v) + .map_err(crate::error::InternalError::SerdeJson)?; + map.insert(key, value); + } + + Ok(map) + } } #[derive(Debug, Serialize, Deserialize)] diff --git a/milli/src/error.rs b/milli/src/error.rs index 688977741..a2d5219c1 100644 --- a/milli/src/error.rs +++ b/milli/src/error.rs @@ -27,6 +27,7 @@ pub enum InternalError { DatabaseClosing, DatabaseMissingEntry { db_name: &'static str, key: Option<&'static str> }, FieldIdMapMissingEntry(FieldIdMapMissingEntry), + FieldIdMappingMissingEntry { key: FieldId }, Fst(fst::Error), GrenadInvalidCompressionType, GrenadInvalidFormatVersion, @@ -59,7 +60,7 @@ pub enum UserError { DocumentLimitReached, InvalidDocumentId { document_id: Value }, InvalidFacetsDistribution { invalid_facets_name: BTreeSet }, - InvalidGeoField { document_id: Value, object: Value }, + InvalidGeoField { document_id: Value }, InvalidFilter(String), InvalidSortableAttribute { field: String, valid_fields: BTreeSet }, SortRankingRuleMissing, @@ -187,6 +188,9 @@ impl fmt::Display for InternalError { write!(f, "Missing {} in the {} database.", key.unwrap_or("key"), db_name) } Self::FieldIdMapMissingEntry(error) => error.fmt(f), + Self::FieldIdMappingMissingEntry { key } => { + write!(f, "Missing {} in the field id mapping.", key) + } Self::Fst(error) => error.fmt(f), Self::GrenadInvalidCompressionType => { f.write_str("Invalid compression type have been specified to grenad.") @@ -226,19 +230,15 @@ impl fmt::Display for UserError { name_list ) } - Self::InvalidGeoField { document_id, object } => { + Self::InvalidGeoField { document_id } => { let document_id = match document_id { Value::String(id) => id.clone(), _ => document_id.to_string(), }; - let object = match object { - Value::String(id) => id.clone(), - _ => object.to_string(), - }; write!( f, - "The document with the id: `{}` contains an invalid _geo field: `{}`.", - document_id, object + "The document with the id: `{}` contains an invalid `_geo` field.", + document_id ) }, Self::InvalidDocumentId { document_id } => { diff --git a/milli/src/index.rs b/milli/src/index.rs index 42170bc80..3adfd2629 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -31,6 +31,7 @@ pub mod main_key { pub const DISPLAYED_FIELDS_KEY: &str = "displayed-fields"; pub const DISTINCT_FIELD_KEY: &str = "distinct-field-key"; pub const DOCUMENTS_IDS_KEY: &str = "documents-ids"; + pub const HIDDEN_FACETED_FIELDS_KEY: &str = "hidden-faceted-fields"; pub const FILTERABLE_FIELDS_KEY: &str = "filterable-fields"; pub const SORTABLE_FIELDS_KEY: &str = "sortable-fields"; pub const FIELD_DISTRIBUTION_KEY: &str = "fields-distribution"; @@ -567,12 +568,46 @@ impl Index { Ok(fields.into_iter().filter_map(|name| fields_ids_map.id(&name)).collect()) } - /* faceted documents ids */ + /* faceted fields */ + + /// Writes the faceted fields in the database. + pub(crate) fn put_faceted_fields( + &self, + wtxn: &mut RwTxn, + fields: &HashSet, + ) -> heed::Result<()> { + self.main.put::<_, Str, SerdeJson<_>>(wtxn, main_key::HIDDEN_FACETED_FIELDS_KEY, fields) + } /// Returns the faceted fields names. + pub fn faceted_fields(&self, rtxn: &RoTxn) -> heed::Result> { + Ok(self + .main + .get::<_, Str, SerdeJson<_>>(rtxn, main_key::HIDDEN_FACETED_FIELDS_KEY)? + .unwrap_or_default()) + } + + /// Identical to `faceted_fields`, but returns ids instead. + pub fn faceted_fields_ids(&self, rtxn: &RoTxn) -> Result> { + let fields = self.faceted_fields(rtxn)?; + let fields_ids_map = self.fields_ids_map(rtxn)?; + + let mut fields_ids = HashSet::new(); + for name in fields { + if let Some(field_id) = fields_ids_map.id(&name) { + fields_ids.insert(field_id); + } + } + + Ok(fields_ids) + } + + /* faceted documents ids */ + + /// Returns the user defined faceted fields names. /// - /// Faceted fields are the union of all the filterable, sortable, distinct, and Asc/Desc fields. - pub fn faceted_fields(&self, rtxn: &RoTxn) -> Result> { + /// The user faceted fields are the union of all the filterable, sortable, distinct, and Asc/Desc fields. + pub fn user_defined_faceted_fields(&self, rtxn: &RoTxn) -> Result> { let filterable_fields = self.filterable_fields(rtxn)?; let sortable_fields = self.sortable_fields(rtxn)?; let distinct_field = self.distinct_field(rtxn)?; @@ -592,8 +627,8 @@ impl Index { Ok(faceted_fields) } - /// Identical to `faceted_fields`, but returns ids instead. - pub fn faceted_fields_ids(&self, rtxn: &RoTxn) -> Result> { + /// Identical to `user_defined_faceted_fields`, but returns ids instead. + pub fn user_defined_faceted_fields_ids(&self, rtxn: &RoTxn) -> Result> { let fields = self.faceted_fields(rtxn)?; let fields_ids_map = self.fields_ids_map(rtxn)?; @@ -1040,13 +1075,14 @@ pub(crate) mod tests { let content = documents!([ { "id": 1, "name": "kevin" }, { "id": 2, "name": "bob", "age": 20 }, - { "id": 2, "name": "bob", "age": 20 } + { "id": 2, "name": "bob", "age": 20 }, ]); let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -1067,11 +1103,12 @@ pub(crate) mod tests { // field_distribution in the end let mut wtxn = index.write_txn().unwrap(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); let content = documents!([ { "id": 1, "name": "kevin" }, { "id": 2, "name": "bob", "age": 20 }, - { "id": 2, "name": "bob", "age": 20 } + { "id": 2, "name": "bob", "age": 20 }, ]); builder.add_documents(content).unwrap(); builder.execute().unwrap(); @@ -1097,7 +1134,8 @@ pub(crate) mod tests { let mut wtxn = index.write_txn().unwrap(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); diff --git a/milli/src/lib.rs b/milli/src/lib.rs index ba2bd9b0f..ec28dbb1b 100644 --- a/milli/src/lib.rs +++ b/milli/src/lib.rs @@ -183,6 +183,43 @@ pub fn lat_lng_to_xyz(coord: &[f64; 2]) -> [f64; 3] { [x, y, z] } +/// Returns `true` if the field match one of the faceted fields. +/// See the function [`is_faceted_by`] below to see what “matching” means. +pub fn is_faceted(field: &str, faceted_fields: impl IntoIterator>) -> bool { + faceted_fields.into_iter().find(|facet| is_faceted_by(field, facet.as_ref())).is_some() +} + +/// Returns `true` if the field match the facet. +/// ``` +/// use milli::is_faceted_by; +/// // -- the valid basics +/// assert!(is_faceted_by("animaux", "animaux")); +/// assert!(is_faceted_by("animaux.chien", "animaux")); +/// assert!(is_faceted_by("animaux.chien.race.bouvier bernois.fourrure.couleur", "animaux")); +/// assert!(is_faceted_by("animaux.chien.race.bouvier bernois.fourrure.couleur", "animaux.chien")); +/// assert!(is_faceted_by("animaux.chien.race.bouvier bernois.fourrure.couleur", "animaux.chien.race.bouvier bernois")); +/// assert!(is_faceted_by("animaux.chien.race.bouvier bernois.fourrure.couleur", "animaux.chien.race.bouvier bernois.fourrure")); +/// assert!(is_faceted_by("animaux.chien.race.bouvier bernois.fourrure.couleur", "animaux.chien.race.bouvier bernois.fourrure.couleur")); +/// +/// // -- the wrongs +/// assert!(!is_faceted_by("chien", "chat")); +/// assert!(!is_faceted_by("animaux", "animaux.chien")); +/// assert!(!is_faceted_by("animaux.chien", "animaux.chat")); +/// +/// // -- the strange edge cases +/// assert!(!is_faceted_by("animaux.chien", "anima")); +/// assert!(!is_faceted_by("animaux.chien", "animau")); +/// assert!(!is_faceted_by("animaux.chien", "animaux.")); +/// assert!(!is_faceted_by("animaux.chien", "animaux.c")); +/// assert!(!is_faceted_by("animaux.chien", "animaux.ch")); +/// assert!(!is_faceted_by("animaux.chien", "animaux.chi")); +/// assert!(!is_faceted_by("animaux.chien", "animaux.chie")); +/// ``` +pub fn is_faceted_by(field: &str, facet: &str) -> bool { + field.starts_with(facet) + && field[facet.len()..].chars().next().map(|c| c == '.').unwrap_or(true) +} + #[cfg(test)] mod tests { use serde_json::json; diff --git a/milli/src/search/distinct/mod.rs b/milli/src/search/distinct/mod.rs index 965423886..237fd718a 100644 --- a/milli/src/search/distinct/mod.rs +++ b/milli/src/search/distinct/mod.rs @@ -97,7 +97,8 @@ mod test { update_method: IndexDocumentsMethod::ReplaceDocuments, ..Default::default() }; - let mut addition = IndexDocuments::new(&mut txn, &index, &config, indexing_config, |_| ()); + let mut addition = + IndexDocuments::new(&mut txn, &index, &config, indexing_config, |_| ()).unwrap(); let reader = crate::documents::DocumentBatchReader::from_reader(Cursor::new(&*JSON)).unwrap(); diff --git a/milli/src/search/facet/facet_distribution.rs b/milli/src/search/facet/facet_distribution.rs index 91bf21cf7..2208ee636 100644 --- a/milli/src/search/facet/facet_distribution.rs +++ b/milli/src/search/facet/facet_distribution.rs @@ -220,9 +220,13 @@ impl<'a> FacetDistribution<'a> { pub fn execute(&self) -> Result>> { let fields_ids_map = self.index.fields_ids_map(self.rtxn)?; let filterable_fields = self.index.filterable_fields(self.rtxn)?; + let fields = match self.facets { Some(ref facets) => { - let invalid_fields: HashSet<_> = facets.difference(&filterable_fields).collect(); + let invalid_fields: HashSet<_> = facets + .iter() + .filter(|facet| !crate::is_faceted(facet, &filterable_fields)) + .collect(); if !invalid_fields.is_empty() { return Err(UserError::InvalidFacetsDistribution { invalid_facets_name: invalid_fields.into_iter().cloned().collect(), @@ -236,10 +240,12 @@ impl<'a> FacetDistribution<'a> { }; let mut distribution = BTreeMap::new(); - for name in fields { - if let Some(fid) = fields_ids_map.id(&name) { + for (fid, name) in fields_ids_map.iter() { + if crate::is_faceted(name, &fields) { let values = self.facet_values(fid)?; - distribution.insert(name, values); + if !values.is_empty() { + distribution.insert(name.to_string(), values); + } } } diff --git a/milli/src/search/facet/filter.rs b/milli/src/search/facet/filter.rs index 9388cfa33..8f1ee749f 100644 --- a/milli/src/search/facet/filter.rs +++ b/milli/src/search/facet/filter.rs @@ -353,7 +353,8 @@ impl<'a> Filter<'a> { match &self.condition { FilterCondition::Condition { fid, op } => { let filterable_fields = index.filterable_fields(rtxn)?; - if filterable_fields.contains(fid.value()) { + + if crate::is_faceted(fid.value(), &filterable_fields) { let field_ids_map = index.fields_ids_map(rtxn)?; if let Some(fid) = field_ids_map.id(fid.value()) { Self::evaluate_operator(rtxn, index, numbers_db, strings_db, fid, &op) @@ -549,7 +550,6 @@ mod tests { Filter::from_str("channel = gotaga AND (timestamp = 44 OR channel != ponce)") .unwrap() .unwrap(); - println!("\nExpecting: {:#?}\nGot: {:#?}\n", expected, condition); assert_eq!(condition, expected); } diff --git a/milli/src/search/mod.rs b/milli/src/search/mod.rs index 0d33d9042..b01bae817 100644 --- a/milli/src/search/mod.rs +++ b/milli/src/search/mod.rs @@ -159,7 +159,7 @@ impl<'a> Search<'a> { let sortable_fields = self.index.sortable_fields(self.rtxn)?; for asc_desc in sort_criteria { match asc_desc.member() { - Member::Field(ref field) if !sortable_fields.contains(field) => { + Member::Field(ref field) if !crate::is_faceted(field, &sortable_fields) => { return Err(UserError::InvalidSortableAttribute { field: field.to_string(), valid_fields: sortable_fields.into_iter().collect(), diff --git a/milli/src/update/clear_documents.rs b/milli/src/update/clear_documents.rs index 3665d2313..f93ba60fa 100644 --- a/milli/src/update/clear_documents.rs +++ b/milli/src/update/clear_documents.rs @@ -98,7 +98,8 @@ mod tests { ]); let indexing_config = IndexDocumentsConfig::default(); let config = IndexerConfig::default(); - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); @@ -110,7 +111,8 @@ mod tests { let rtxn = index.read_txn().unwrap(); - assert_eq!(index.fields_ids_map(&rtxn).unwrap().len(), 5); + // the value is 7 because there is `[id, name, age, country, _geo, _geo.lng, _geo.lat]` + assert_eq!(index.fields_ids_map(&rtxn).unwrap().len(), 7); assert!(index.words_fst(&rtxn).unwrap().is_empty()); assert!(index.words_prefixes_fst(&rtxn).unwrap().is_empty()); diff --git a/milli/src/update/delete_documents.rs b/milli/src/update/delete_documents.rs index 77c32f0fb..97250d988 100644 --- a/milli/src/update/delete_documents.rs +++ b/milli/src/update/delete_documents.rs @@ -647,7 +647,8 @@ mod tests { ]); let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); @@ -681,7 +682,8 @@ mod tests { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); @@ -733,7 +735,8 @@ mod tests { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); @@ -790,7 +793,8 @@ mod tests { let indexing_config = IndexDocumentsConfig::default(); - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); diff --git a/milli/src/update/index_documents/extract/extract_geo_points.rs b/milli/src/update/index_documents/extract/extract_geo_points.rs index e58d351d6..65cb1c3ce 100644 --- a/milli/src/update/index_documents/extract/extract_geo_points.rs +++ b/milli/src/update/index_documents/extract/extract_geo_points.rs @@ -2,7 +2,6 @@ use std::fs::File; use std::io; use concat_arrays::concat_arrays; -use serde_json::Value; use super::helpers::{create_writer, writer_into_reader, GrenadParameters}; use crate::{FieldId, InternalError, Result, UserError}; @@ -14,7 +13,7 @@ pub fn extract_geo_points( obkv_documents: grenad::Reader, indexer: GrenadParameters, primary_key_id: FieldId, - geo_field_id: FieldId, + (lat_fid, lng_fid): (FieldId, FieldId), ) -> Result> { let mut writer = create_writer( indexer.chunk_compression_type, @@ -25,22 +24,18 @@ pub fn extract_geo_points( let mut cursor = obkv_documents.into_cursor()?; while let Some((docid_bytes, value)) = cursor.move_on_next()? { let obkv = obkv::KvReader::new(value); - let point: Value = match obkv.get(geo_field_id) { - Some(point) => serde_json::from_slice(point).map_err(InternalError::SerdeJson)?, - None => continue, - }; - - if let Some((lat, lng)) = point["lat"].as_f64().zip(point["lng"].as_f64()) { - // this will create an array of 16 bytes (two 8 bytes floats) - let bytes: [u8; 16] = concat_arrays![lat.to_ne_bytes(), lng.to_ne_bytes()]; - writer.insert(docid_bytes, bytes)?; - } else { - // All document must have a primary key so we can unwrap safely here + let (lat, lng) = obkv.get(lat_fid).zip(obkv.get(lng_fid)).ok_or_else(|| { let primary_key = obkv.get(primary_key_id).unwrap(); - let primary_key = - serde_json::from_slice(primary_key).map_err(InternalError::SerdeJson)?; - Err(UserError::InvalidGeoField { document_id: primary_key, object: point })? - } + let primary_key = serde_json::from_slice(primary_key).unwrap(); + UserError::InvalidGeoField { document_id: primary_key } + })?; + let (lat, lng): (f64, f64) = ( + serde_json::from_slice(lat).map_err(InternalError::SerdeJson)?, + serde_json::from_slice(lng).map_err(InternalError::SerdeJson)?, + ); + + let bytes: [u8; 16] = concat_arrays![lat.to_ne_bytes(), lng.to_ne_bytes()]; + writer.insert(docid_bytes, bytes)?; } Ok(writer_into_reader(writer)?) diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 8f6797a3b..c3c2033a6 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -34,28 +34,36 @@ use crate::{FieldId, Result}; /// Extract data for each databases from obkv documents in parallel. /// Send data in grenad file over provided Sender. pub(crate) fn data_from_obkv_documents( - obkv_chunks: impl Iterator>> + Send, + original_obkv_chunks: impl Iterator>> + Send, + flattened_obkv_chunks: impl Iterator>> + Send, indexer: GrenadParameters, lmdb_writer_sx: Sender>, searchable_fields: Option>, faceted_fields: HashSet, primary_key_id: FieldId, - geo_field_id: Option, + geo_fields_ids: Option<(FieldId, FieldId)>, stop_words: Option>, max_positions_per_attributes: Option, exact_attributes: HashSet, ) -> Result<()> { - let result: Result<(Vec<_>, (Vec<_>, Vec<_>))> = obkv_chunks + original_obkv_chunks .par_bridge() - .map(|result| { - extract_documents_data( - result, + .map(|original_documents_chunk| { + send_original_documents_data(original_documents_chunk, lmdb_writer_sx.clone()) + }) + .collect::>()?; + + let result: Result<(Vec<_>, (Vec<_>, Vec<_>))> = flattened_obkv_chunks + .par_bridge() + .map(|flattened_obkv_chunks| { + send_and_extract_flattened_documents_data( + flattened_obkv_chunks, indexer, lmdb_writer_sx.clone(), &searchable_fields, &faceted_fields, primary_key_id, - geo_field_id, + geo_fields_ids, &stop_words, max_positions_per_attributes, ) @@ -170,36 +178,48 @@ fn spawn_extraction_task( }); } -/// Extract chuncked data and send it into lmdb_writer_sx sender: +/// Extract chunked data and send it into lmdb_writer_sx sender: /// - documents +fn send_original_documents_data( + original_documents_chunk: Result>, + lmdb_writer_sx: Sender>, +) -> Result<()> { + let original_documents_chunk = + original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; + + // TODO: create a custom internal error + lmdb_writer_sx.send(Ok(TypedChunk::Documents(original_documents_chunk))).unwrap(); + Ok(()) +} + +/// Extract chunked data and send it into lmdb_writer_sx sender: /// - documents_ids /// - docid_word_positions /// - docid_fid_facet_numbers /// - docid_fid_facet_strings -fn extract_documents_data( - documents_chunk: Result>, +fn send_and_extract_flattened_documents_data( + flattened_documents_chunk: Result>, indexer: GrenadParameters, lmdb_writer_sx: Sender>, searchable_fields: &Option>, faceted_fields: &HashSet, primary_key_id: FieldId, - geo_field_id: Option, + geo_fields_ids: Option<(FieldId, FieldId)>, stop_words: &Option>, max_positions_per_attributes: Option, ) -> Result<( grenad::Reader, (grenad::Reader, grenad::Reader), )> { - let documents_chunk = documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; + let flattened_documents_chunk = + flattened_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; - let _ = lmdb_writer_sx.send(Ok(TypedChunk::Documents(documents_chunk.clone()))); - - if let Some(geo_field_id) = geo_field_id { - let documents_chunk_cloned = documents_chunk.clone(); + if let Some(geo_fields_ids) = geo_fields_ids { + let documents_chunk_cloned = flattened_documents_chunk.clone(); let lmdb_writer_sx_cloned = lmdb_writer_sx.clone(); rayon::spawn(move || { let result = - extract_geo_points(documents_chunk_cloned, indexer, primary_key_id, geo_field_id); + extract_geo_points(documents_chunk_cloned, indexer, primary_key_id, geo_fields_ids); let _ = match result { Ok(geo_points) => lmdb_writer_sx_cloned.send(Ok(TypedChunk::GeoPoints(geo_points))), Err(error) => lmdb_writer_sx_cloned.send(Err(error)), @@ -211,7 +231,7 @@ fn extract_documents_data( rayon::join( || { let (documents_ids, docid_word_positions_chunk) = extract_docid_word_positions( - documents_chunk.clone(), + flattened_documents_chunk.clone(), indexer.clone(), searchable_fields, stop_words.as_ref(), @@ -232,7 +252,7 @@ fn extract_documents_data( || { let (docid_fid_facet_numbers_chunk, docid_fid_facet_strings_chunk) = extract_fid_docid_facet_values( - documents_chunk.clone(), + flattened_documents_chunk.clone(), indexer.clone(), faceted_fields, )?; diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 0e6e59e10..eb50a85ed 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -30,7 +30,7 @@ use crate::update::{ self, Facets, IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixPairProximityDocids, WordPrefixPositionDocids, WordsPrefixesFst, }; -use crate::{Index, Result, RoaringBitmapCodec}; +use crate::{Index, Result, RoaringBitmapCodec, UserError}; static MERGED_DATABASE_COUNT: usize = 7; static PREFIX_DATABASE_COUNT: usize = 5; @@ -94,15 +94,16 @@ where indexer_config: &'a IndexerConfig, config: IndexDocumentsConfig, progress: F, - ) -> IndexDocuments<'t, 'u, 'i, 'a, F> { + ) -> Result> { let transform = Some(Transform::new( + wtxn, &index, indexer_config, config.update_method, config.autogenerate_docids, - )); + )?); - IndexDocuments { + Ok(IndexDocuments { transform, config, indexer_config, @@ -110,7 +111,7 @@ where wtxn, index, added_documents: 0, - } + }) } /// Adds a batch of documents to the current builder. @@ -151,6 +152,10 @@ where .take() .expect("Invalid document addition state") .output_from_sorter(self.wtxn, &self.progress)?; + + let new_facets = output.compute_real_facets(self.wtxn, self.index)?; + self.index.put_faceted_fields(self.wtxn, &new_facets)?; + let indexed_documents = output.documents_count as u64; let number_of_documents = self.execute_raw(output)?; @@ -171,7 +176,8 @@ where new_documents_ids, replaced_documents_ids, documents_count, - documents_file, + original_documents, + flattened_documents, } = output; // The fields_ids_map is put back to the store now so the rest of the transaction sees an @@ -197,7 +203,8 @@ where } }; - let documents_file = grenad::Reader::new(documents_file)?; + let original_documents = grenad::Reader::new(original_documents)?; + let flattened_documents = grenad::Reader::new(flattened_documents)?; // create LMDB writer channel let (lmdb_writer_sx, lmdb_writer_rx): ( @@ -213,13 +220,20 @@ where self.index.searchable_fields_ids(self.wtxn)?.map(HashSet::from_iter); // get filterable fields for facet databases let faceted_fields = self.index.faceted_fields_ids(self.wtxn)?; - // get the fid of the `_geo` field. - let geo_field_id = match self.index.fields_ids_map(self.wtxn)?.id("_geo") { + // get the fid of the `_geo.lat` and `_geo.lng` fields. + let geo_fields_ids = match self.index.fields_ids_map(self.wtxn)?.id("_geo") { Some(gfid) => { let is_sortable = self.index.sortable_fields_ids(self.wtxn)?.contains(&gfid); let is_filterable = self.index.filterable_fields_ids(self.wtxn)?.contains(&gfid); + // if `_geo` is faceted then we get the `lat` and `lng` if is_sortable || is_filterable { - Some(gfid) + let field_ids = self + .index + .fields_ids_map(self.wtxn)? + .insert("_geo.lat") + .zip(self.index.fields_ids_map(self.wtxn)?.insert("_geo.lng")) + .ok_or(UserError::AttributeLimitReached)?; + Some(field_ids) } else { None } @@ -239,28 +253,38 @@ where max_nb_chunks: self.indexer_config.max_nb_chunks, // default value, may be chosen. }; - // split obkv file into several chuncks - let chunk_iter = grenad_obkv_into_chunks( - documents_file, + // split obkv file into several chunks + let original_chunk_iter = grenad_obkv_into_chunks( + original_documents, params.clone(), self.indexer_config.documents_chunk_size.unwrap_or(1024 * 1024 * 4), // 4MiB ); - let result = chunk_iter.map(|chunk_iter| { - // extract all databases from the chunked obkv douments - extract::data_from_obkv_documents( - chunk_iter, - params, - lmdb_writer_sx.clone(), - searchable_fields, - faceted_fields, - primary_key_id, - geo_field_id, - stop_words, - self.indexer_config.max_positions_per_attributes, - exact_attributes, - ) - }); + // split obkv file into several chunks + let flattened_chunk_iter = grenad_obkv_into_chunks( + flattened_documents, + params.clone(), + self.indexer_config.documents_chunk_size.unwrap_or(1024 * 1024 * 4), // 4MiB + ); + + let result = original_chunk_iter + .and_then(|original_chunk_iter| Ok((original_chunk_iter, flattened_chunk_iter?))) + .map(|(original_chunk, flattened_chunk)| { + // extract all databases from the chunked obkv douments + extract::data_from_obkv_documents( + original_chunk, + flattened_chunk, + params, + lmdb_writer_sx.clone(), + searchable_fields, + faceted_fields, + primary_key_id, + geo_fields_ids, + stop_words, + self.indexer_config.max_positions_per_attributes, + exact_attributes, + ) + }); if let Err(e) = result { let _ = lmdb_writer_sx.send(Err(e)); @@ -550,6 +574,7 @@ mod tests { use big_s::S; use heed::EnvOpenOptions; + use maplit::hashset; use super::*; use crate::documents::DocumentBatchBuilder; @@ -574,7 +599,8 @@ mod tests { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -589,7 +615,8 @@ mod tests { let mut wtxn = index.write_txn().unwrap(); let content = documents!([ { "id": 1, "name": "updated kevin" } ]); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -607,7 +634,8 @@ mod tests { { "id": 2, "name": "updated kevina" }, { "id": 3, "name": "updated benoit" } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); builder.add_documents(content).unwrap(); wtxn.commit().unwrap(); @@ -639,7 +667,8 @@ mod tests { ..Default::default() }; let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -665,7 +694,8 @@ mod tests { // Second we send 1 document with id 1, to force it to be merged with the previous one. let mut wtxn = index.write_txn().unwrap(); let content = documents!([ { "id": 1, "age": 25 } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -706,7 +736,8 @@ mod tests { ]); let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); assert!(builder.add_documents(content).is_err()); wtxn.commit().unwrap(); @@ -735,7 +766,8 @@ mod tests { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -753,7 +785,8 @@ mod tests { // Second we send 1 document with the generated uuid, to erase the previous ones. let mut wtxn = index.write_txn().unwrap(); let content = documents!([ { "name": "updated kevin", "id": kevin_uuid } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -793,7 +826,8 @@ mod tests { ]); let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -809,7 +843,8 @@ mod tests { let content = documents!([ { "name": "new kevin" } ]); let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -833,7 +868,8 @@ mod tests { let content = documents!([]); let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -859,7 +895,8 @@ mod tests { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); assert!(builder.add_documents(content).is_err()); wtxn.commit().unwrap(); @@ -867,7 +904,8 @@ mod tests { let mut wtxn = index.write_txn().unwrap(); // There is a space in the document id. let content = documents!([ { "id": 32, "name": "kevin" } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -895,7 +933,8 @@ mod tests { ]); let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -912,7 +951,7 @@ mod tests { assert_eq!(result.documents_ids, vec![1]); // Search for a sub array sub object key - let result = index.search(&rtxn).query(r#""wow""#).execute().unwrap(); + let result = index.search(&rtxn).query(r#""amazing""#).execute().unwrap(); assert_eq!(result.documents_ids, vec![2]); drop(rtxn); @@ -940,7 +979,8 @@ mod tests { update_method: IndexDocumentsMethod::ReplaceDocuments, ..Default::default() }; - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); builder.add_documents(documents).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -950,7 +990,8 @@ mod tests { update_method: IndexDocumentsMethod::UpdateDocuments, ..Default::default() }; - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); let documents = documents!([ { "id": 2, @@ -981,7 +1022,8 @@ mod tests { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); @@ -1000,7 +1042,8 @@ mod tests { ]); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); let external_documents_ids = index.external_documents_ids(&wtxn).unwrap(); @@ -1011,7 +1054,8 @@ mod tests { ]); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); @@ -1046,7 +1090,8 @@ mod tests { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); @@ -1080,7 +1125,8 @@ mod tests { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); @@ -1137,13 +1183,333 @@ mod tests { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); } + #[test] + fn index_documents_with_nested_fields() { + let path = tempfile::tempdir().unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(10 * 1024 * 1024); // 10 MB + let index = Index::new(options, &path).unwrap(); + + let mut wtxn = index.write_txn().unwrap(); + let content = documents!([ + { + "id": 0, + "title": "The zeroth document", + }, + { + "id": 1, + "title": "The first document", + "nested": { + "object": "field", + "machin": "bidule", + }, + }, + { + "id": 2, + "title": "The second document", + "nested": [ + "array", + { + "object": "field", + }, + { + "prout": "truc", + "machin": "lol", + }, + ], + }, + { + "id": 3, + "title": "The third document", + "nested": "I lied", + }, + ]); + + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); + + wtxn.commit().unwrap(); + + let mut wtxn = index.write_txn().unwrap(); + let mut builder = update::Settings::new(&mut wtxn, &index, &config); + + let searchable_fields = vec![S("title"), S("nested.object"), S("nested.machin")]; + builder.set_searchable_fields(searchable_fields); + + let faceted_fields = hashset!(S("title"), S("nested.object"), S("nested.machin")); + builder.set_filterable_fields(faceted_fields); + builder.execute(|_| ()).unwrap(); + wtxn.commit().unwrap(); + + let rtxn = index.read_txn().unwrap(); + + let facets = index.faceted_fields(&rtxn).unwrap(); + assert_eq!(facets, hashset!(S("title"), S("nested.object"), S("nested.machin"))); + + // testing the simple query search + let mut search = crate::Search::new(&rtxn, &index); + search.query("document"); + search.authorize_typos(true); + search.optional_words(true); + // all documents should be returned + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids.len(), 4); + + search.query("zeroth"); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids, vec![0]); + search.query("first"); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids, vec![1]); + search.query("second"); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids, vec![2]); + search.query("third"); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids, vec![3]); + + search.query("field"); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids, vec![1, 2]); + + search.query("lol"); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids, vec![2]); + + search.query("object"); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert!(documents_ids.is_empty()); + + search.query("array"); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert!(documents_ids.is_empty()); // nested is not searchable + + search.query("lied"); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert!(documents_ids.is_empty()); // nested is not searchable + + // testing the filters + let mut search = crate::Search::new(&rtxn, &index); + search.filter(crate::Filter::from_str(r#"title = "The first document""#).unwrap().unwrap()); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids, vec![1]); + + search.filter(crate::Filter::from_str(r#"nested.object = field"#).unwrap().unwrap()); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids, vec![1, 2]); + + search.filter(crate::Filter::from_str(r#"nested.machin = bidule"#).unwrap().unwrap()); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids, vec![1]); + + search.filter(crate::Filter::from_str(r#"nested = array"#).unwrap().unwrap()); + let error = search.execute().map(|_| unreachable!()).unwrap_err(); // nested is not filterable + assert!(matches!(error, crate::Error::UserError(crate::UserError::InvalidFilter(_)))); + + search.filter(crate::Filter::from_str(r#"nested = "I lied""#).unwrap().unwrap()); + let error = search.execute().map(|_| unreachable!()).unwrap_err(); // nested is not filterable + assert!(matches!(error, crate::Error::UserError(crate::UserError::InvalidFilter(_)))); + } + + #[test] + fn index_documents_with_nested_primary_key() { + let path = tempfile::tempdir().unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(10 * 1024 * 1024); // 10 MB + let index = Index::new(options, &path).unwrap(); + let config = IndexerConfig::default(); + + let mut wtxn = index.write_txn().unwrap(); + let mut builder = update::Settings::new(&mut wtxn, &index, &config); + builder.set_primary_key("nested.id".to_owned()); + builder.execute(|_| ()).unwrap(); + wtxn.commit().unwrap(); + + let mut wtxn = index.write_txn().unwrap(); + let content = documents!([ + { + "nested": { + "id": 0, + }, + "title": "The zeroth document", + }, + { + "nested": { + "id": 1, + }, + "title": "The first document", + }, + { + "nested": { + "id": 2, + }, + "title": "The second document", + }, + { + "nested.id": 3, + "title": "The third document", + }, + ]); + + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); + wtxn.commit().unwrap(); + + let rtxn = index.read_txn().unwrap(); + + // testing the simple query search + let mut search = crate::Search::new(&rtxn, &index); + search.query("document"); + search.authorize_typos(true); + search.optional_words(true); + // all documents should be returned + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids.len(), 4); + + search.query("zeroth"); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids, vec![0]); + search.query("first"); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids, vec![1]); + search.query("second"); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids, vec![2]); + search.query("third"); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids, vec![3]); + } + + #[test] + fn test_facets_generation() { + let path = tempfile::tempdir().unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(10 * 1024 * 1024); // 10 MB + let index = Index::new(options, &path).unwrap(); + + let mut wtxn = index.write_txn().unwrap(); + let content = documents!([ + { + "id": 0, + "dog": { + "race": { + "bernese mountain": "zeroth", + }, + }, + }, + { + "id": 1, + "dog.race": { + "bernese mountain": "first", + }, + }, + { + "id": 2, + "dog.race.bernese mountain": "second", + }, + { + "id": 3, + "dog": { + "race.bernese mountain": "third" + }, + }, + ]); + + // index the documents + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); + + wtxn.commit().unwrap(); + + // ---- ADD THE SETTING TO TEST THE FILTERABLE + + // add the settings + let mut wtxn = index.write_txn().unwrap(); + let mut builder = update::Settings::new(&mut wtxn, &index, &config); + + builder.set_filterable_fields(hashset!(String::from("dog"))); + + builder.execute(|_| ()).unwrap(); + wtxn.commit().unwrap(); + + let rtxn = index.read_txn().unwrap(); + + let hidden = index.faceted_fields(&rtxn).unwrap(); + + assert_eq!(hidden, hashset!(S("dog"), S("dog.race"), S("dog.race.bernese mountain"))); + + for (s, i) in [("zeroth", 0), ("first", 1), ("second", 2), ("third", 3)] { + let mut search = crate::Search::new(&rtxn, &index); + let filter = format!(r#""dog.race.bernese mountain" = {s}"#); + search.filter(crate::Filter::from_str(&filter).unwrap().unwrap()); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids, vec![i]); + } + + // ---- RESET THE SETTINGS + + // update the settings + let mut wtxn = index.write_txn().unwrap(); + let mut builder = update::Settings::new(&mut wtxn, &index, &config); + + builder.reset_filterable_fields(); + + builder.execute(|_| ()).unwrap(); + wtxn.commit().unwrap(); + + let rtxn = index.read_txn().unwrap(); + + let facets = index.faceted_fields(&rtxn).unwrap(); + + assert_eq!(facets, hashset!()); + + // ---- UPDATE THE SETTINGS TO TEST THE SORTABLE + + // update the settings + let mut wtxn = index.write_txn().unwrap(); + let mut builder = update::Settings::new(&mut wtxn, &index, &config); + + builder.set_sortable_fields(hashset!(S("dog.race"))); + + builder.execute(|_| ()).unwrap(); + wtxn.commit().unwrap(); + + let rtxn = index.read_txn().unwrap(); + + let facets = index.faceted_fields(&rtxn).unwrap(); + + assert_eq!(facets, hashset!(S("dog.race"), S("dog.race.bernese mountain"))); + + let mut search = crate::Search::new(&rtxn, &index); + search.sort_criteria(vec![crate::AscDesc::Asc(crate::Member::Field(S( + "dog.race.bernese mountain", + )))]); + let crate::SearchResult { documents_ids, .. } = search.execute().unwrap(); + assert_eq!(documents_ids, vec![1, 2, 3, 0]); + } + #[test] fn index_2_times_documents_split_by_zero_document_indexation() { let path = tempfile::tempdir().unwrap(); @@ -1162,7 +1528,8 @@ mod tests { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -1178,7 +1545,8 @@ mod tests { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -1199,7 +1567,8 @@ mod tests { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -1226,7 +1595,8 @@ mod tests { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 4ec34c0c6..4413e00ca 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -1,24 +1,27 @@ use std::borrow::Cow; -use std::collections::btree_map::Entry; -use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; use std::fs::File; use std::io::{Read, Seek, SeekFrom}; -use std::time::Instant; +use byteorder::ReadBytesExt; +use fxhash::FxHashMap; +use heed::RoTxn; use itertools::Itertools; -use log::info; +use obkv::{KvReader, KvWriter}; use roaring::RoaringBitmap; use serde_json::{Map, Value}; -use super::helpers::{ - create_sorter, create_writer, keep_latest_obkv, merge_obkvs, merge_two_obkvs, MergeFn, -}; +use super::helpers::{create_sorter, create_writer, keep_latest_obkv, merge_obkvs, MergeFn}; use super::{IndexDocumentsMethod, IndexerConfig}; use crate::documents::{DocumentBatchReader, DocumentsBatchIndex}; use crate::error::{Error, InternalError, UserError}; use crate::index::db_name; use crate::update::{AvailableDocumentsIds, UpdateIndexingStep}; -use crate::{ExternalDocumentsIds, FieldDistribution, FieldId, FieldsIdsMap, Index, Result, BEU32}; +use crate::{ + ExternalDocumentsIds, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, + Result, BEU32, +}; const DEFAULT_PRIMARY_KEY_NAME: &str = "id"; @@ -30,7 +33,8 @@ pub struct TransformOutput { pub new_documents_ids: RoaringBitmap, pub replaced_documents_ids: RoaringBitmap, pub documents_count: usize, - pub documents_file: File, + pub original_documents: File, + pub flattened_documents: File, } /// Extract the external ids, deduplicate and compute the new internal documents ids @@ -41,11 +45,17 @@ pub struct TransformOutput { /// containing all those documents. pub struct Transform<'a, 'i> { pub index: &'i Index, + fields_ids_map: FieldsIdsMap, + indexer_settings: &'a IndexerConfig, pub autogenerate_docids: bool, pub index_documents_method: IndexDocumentsMethod, - sorter: grenad::Sorter, + original_sorter: grenad::Sorter, + flattened_sorter: grenad::Sorter, + replaced_documents_ids: RoaringBitmap, + new_documents_ids: RoaringBitmap, + new_external_documents_ids_builder: FxHashMap, u64>, documents_count: usize, } @@ -72,6 +82,9 @@ fn create_fields_mapping( .collect() } +/// Look for a key containing the [DEFAULT_PRIMARY_KEY_NAME] in the fields. +/// It doesn't look in the subfield because we don't want to enable the +/// primary key inference on nested objects. fn find_primary_key(index: &DocumentsBatchIndex) -> Option<&str> { index .iter() @@ -83,11 +96,12 @@ fn find_primary_key(index: &DocumentsBatchIndex) -> Option<&str> { impl<'a, 'i> Transform<'a, 'i> { pub fn new( + wtxn: &mut heed::RwTxn, index: &'i Index, indexer_settings: &'a IndexerConfig, index_documents_method: IndexDocumentsMethod, autogenerate_docids: bool, - ) -> Self { + ) -> Result { // We must choose the appropriate merge function for when two or more documents // with the same user id must be merged or fully replaced in the same batch. let merge_function = match index_documents_method { @@ -96,22 +110,36 @@ impl<'a, 'i> Transform<'a, 'i> { }; // We initialize the sorter with the user indexing settings. - let sorter = create_sorter( + let original_sorter = create_sorter( merge_function, indexer_settings.chunk_compression_type, indexer_settings.chunk_compression_level, indexer_settings.max_nb_chunks, - indexer_settings.max_memory, + indexer_settings.max_memory.map(|mem| mem / 2), ); - Transform { + // We initialize the sorter with the user indexing settings. + let flattened_sorter = create_sorter( + merge_function, + indexer_settings.chunk_compression_type, + indexer_settings.chunk_compression_level, + indexer_settings.max_nb_chunks, + indexer_settings.max_memory.map(|mem| mem / 2), + ); + + Ok(Transform { index, + fields_ids_map: index.fields_ids_map(wtxn)?, indexer_settings, autogenerate_docids, - sorter, - documents_count: 0, + original_sorter, + flattened_sorter, index_documents_method, - } + replaced_documents_ids: RoaringBitmap::new(), + new_documents_ids: RoaringBitmap::new(), + new_external_documents_ids_builder: FxHashMap::default(), + documents_count: 0, + }) } pub fn read_documents( @@ -125,8 +153,11 @@ impl<'a, 'i> Transform<'a, 'i> { F: Fn(UpdateIndexingStep) + Sync, { let fields_index = reader.index(); - let mut fields_ids_map = self.index.fields_ids_map(wtxn)?; - let mapping = create_fields_mapping(&mut fields_ids_map, fields_index)?; + let external_documents_ids = self.index.external_documents_ids(wtxn)?; + let documents_ids = self.index.documents_ids(wtxn)?; + let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); + + let mapping = create_fields_mapping(&mut self.fields_ids_map, fields_index)?; let alternative_name = self .index @@ -136,15 +167,19 @@ impl<'a, 'i> Transform<'a, 'i> { let (primary_key_id, primary_key_name) = compute_primary_key_pair( self.index.primary_key(wtxn)?, - &mut fields_ids_map, + &mut self.fields_ids_map, alternative_name, self.autogenerate_docids, )?; + let primary_key_id_nested = primary_key_name.contains('.'); + + let mut flattened_document = None; let mut obkv_buffer = Vec::new(); + let mut flattened_obkv_buffer = Vec::new(); let mut documents_count = 0; let mut external_id_buffer = Vec::new(); - let mut field_buffer: Vec<(u16, &[u8])> = Vec::new(); + let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new(); while let Some((addition_index, document)) = reader.next_document_with_index()? { let mut field_buffer_cache = drop_and_reuse(field_buffer); if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) { @@ -154,8 +189,9 @@ impl<'a, 'i> Transform<'a, 'i> { } for (k, v) in document.iter() { - let mapped_id = *mapping.get(&k).unwrap(); - field_buffer_cache.push((mapped_id, v)); + let mapped_id = + *mapping.get(&k).ok_or(InternalError::FieldIdMappingMissingEntry { key: k })?; + field_buffer_cache.push((mapped_id, Cow::from(v))); } // We need to make sure that every document has a primary key. After we have remapped @@ -164,87 +200,125 @@ impl<'a, 'i> Transform<'a, 'i> { // document. If none is found, and we were told to generate missing document ids, then // we create the missing field, and update the new document. let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH]; - let external_id = - match field_buffer_cache.iter_mut().find(|(id, _)| *id == primary_key_id) { - Some((_, bytes)) => { - let value = match serde_json::from_slice(bytes).unwrap() { - Value::String(string) => match validate_document_id(&string) { - Some(s) if s.len() == string.len() => string, - Some(s) => s.to_string(), - None => { - return Err(UserError::InvalidDocumentId { - document_id: Value::String(string), - } - .into()) - } - }, - Value::Number(number) => number.to_string(), - content => { - return Err(UserError::InvalidDocumentId { - document_id: content.clone(), - } - .into()) - } - }; - serde_json::to_writer(&mut external_id_buffer, &value).unwrap(); - Cow::Owned(value) - } - None => { - if !self.autogenerate_docids { - let mut json = Map::new(); - for (key, value) in document.iter() { - let key = addition_index.name(key).cloned(); - let value = serde_json::from_slice::(&value).ok(); + let external_id = if primary_key_id_nested { + let mut field_buffer_cache = field_buffer_cache.clone(); + self.flatten_from_field_mapping( + &mapping, + &document, + &mut flattened_obkv_buffer, + &mut field_buffer_cache, + )?; + flattened_document = Some(&flattened_obkv_buffer); + let document = KvReader::new(&flattened_obkv_buffer); - if let Some((k, v)) = key.zip(value) { - json.insert(k, v); - } - } - - return Err(UserError::MissingDocumentId { - primary_key: primary_key_name, - document: json, - } - .into()); - } - - let uuid = - uuid::Uuid::new_v4().to_hyphenated().encode_lower(&mut uuid_buffer); - serde_json::to_writer(&mut external_id_buffer, &uuid).unwrap(); - field_buffer_cache.push((primary_key_id, &external_id_buffer)); - Cow::Borrowed(&*uuid) - } - }; + update_primary_key( + document, + &addition_index, + primary_key_id, + &primary_key_name, + &mut uuid_buffer, + &mut field_buffer_cache, + &mut external_id_buffer, + self.autogenerate_docids, + )? + } else { + update_primary_key( + document, + &addition_index, + primary_key_id, + &primary_key_name, + &mut uuid_buffer, + &mut field_buffer_cache, + &mut external_id_buffer, + self.autogenerate_docids, + )? + }; // Insertion in a obkv need to be done with keys ordered. For now they are ordered // according to the document addition key order, so we sort it according to the // fieldids map keys order. field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(&f2)); - // The last step is to build the new obkv document, and insert it in the sorter. + // Build the new obkv document. let mut writer = obkv::KvWriter::new(&mut obkv_buffer); for (k, v) in field_buffer_cache.iter() { writer.insert(*k, v)?; } + let (docid, should_insert_original_document) = + match external_documents_ids.get(&*external_id) { + // if the document is in the db but has already been inserted + // (ie: already exists in the list of replaced documents ids), + // we should not add the original document a second time. + Some(docid) => (docid, !self.replaced_documents_ids.contains(docid)), + None => { + // if the document has already been inserted in this + // batch we need to get its docid + match self + .new_external_documents_ids_builder + .entry(external_id.as_bytes().to_vec()) + { + Entry::Occupied(entry) => (*entry.get() as u32, false), + // if the document has never been encountered we give it a new docid + // and push this new docid to the external documents ids builder + Entry::Vacant(entry) => { + let new_docid = available_documents_ids + .next() + .ok_or(UserError::DocumentLimitReached)?; + entry.insert(new_docid as u64); + (new_docid, false) + } + } + } + }; + + if should_insert_original_document { + self.replaced_documents_ids.insert(docid); + + let key = BEU32::new(docid); + let base_obkv = self + .index + .documents + .remap_data_type::() + .get(wtxn, &key)? + .ok_or(InternalError::DatabaseMissingEntry { + db_name: db_name::DOCUMENTS, + key: None, + })?; + + self.original_sorter.insert(&docid.to_be_bytes(), base_obkv)?; + let buffer = self.flatten_from_fields_ids_map(KvReader::new(&base_obkv))?; + + self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?; + } else { + self.new_documents_ids.insert(docid); + } + // We use the extracted/generated user id as the key for this document. - self.sorter.insert(&external_id.as_ref().as_bytes(), &obkv_buffer)?; + self.original_sorter.insert(&docid.to_be_bytes(), obkv_buffer.clone())?; documents_count += 1; + if let Some(flatten) = flattened_document { + self.flattened_sorter.insert(docid.to_be_bytes(), &flatten)?; + } else { + let buffer = self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))?; + self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?; + } + progress_callback(UpdateIndexingStep::RemapDocumentAddition { documents_seen: documents_count, }); - obkv_buffer.clear(); field_buffer = drop_and_reuse(field_buffer_cache); external_id_buffer.clear(); + obkv_buffer.clear(); } progress_callback(UpdateIndexingStep::RemapDocumentAddition { documents_seen: documents_count, }); - self.index.put_fields_ids_map(wtxn, &fields_ids_map)?; + self.index.put_fields_ids_map(wtxn, &self.fields_ids_map)?; self.index.put_primary_key(wtxn, &primary_key_name)?; self.documents_count += documents_count; // Now that we have a valid sorter that contains the user id and the obkv we @@ -252,6 +326,87 @@ impl<'a, 'i> Transform<'a, 'i> { Ok(documents_count) } + // Flatten a document from the fields ids map contained in self and insert the new + // created fields. + fn flatten_from_fields_ids_map(&mut self, obkv: KvReader) -> Result> { + let mut doc = serde_json::Map::new(); + + for (k, v) in obkv.iter() { + let key = self.fields_ids_map.name(k).ok_or(FieldIdMapMissingEntry::FieldId { + field_id: k, + process: "Flatten from fields ids map.", + })?; + let value = serde_json::from_slice::(v) + .map_err(crate::error::InternalError::SerdeJson)?; + doc.insert(key.to_string(), value); + } + + let flattened = flatten_serde_json::flatten(&doc); + + // Once we have the flattened version we can convert it back to obkv and + // insert all the new generated fields_ids (if any) in the fields ids map. + let mut buffer: Vec = Vec::new(); + let mut writer = KvWriter::new(&mut buffer); + let mut flattened: Vec<_> = flattened.into_iter().collect(); + // we reorder the field to get all the known field first + flattened + .sort_unstable_by_key(|(key, _)| self.fields_ids_map.id(&key).unwrap_or(FieldId::MAX)); + + for (key, value) in flattened { + let fid = self.fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?; + let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?; + writer.insert(fid, &value)?; + } + + Ok(buffer) + } + + // Flatten a document from a field mapping generated by [create_fields_mapping] + fn flatten_from_field_mapping( + &mut self, + mapping: &HashMap, + obkv: &KvReader, + output_buffer: &mut Vec, + field_buffer_cache: &mut Vec<(u16, Cow<[u8]>)>, + ) -> Result<()> { + // if the primary_key is nested we need to flatten the document before being able to do anything + let mut doc = serde_json::Map::new(); + + for (k, v) in obkv.iter() { + let key = + mapping.get(&k).ok_or(InternalError::FieldIdMappingMissingEntry { key: k })?; + let key = self.fields_ids_map.name(*key).ok_or(FieldIdMapMissingEntry::FieldId { + field_id: *key, + process: "Flatten from field mapping.", + })?; + let value = + serde_json::from_slice::(v).map_err(InternalError::SerdeJson)?; + doc.insert(key.to_string(), value); + } + + let flattened = flatten_serde_json::flatten(&doc); + + // Once we have the flattened version we can convert it back to obkv and + // insert all the new generated fields_ids (if any) in the fields ids map. + output_buffer.clear(); + let mut writer = KvWriter::new(output_buffer); + let mut flattened: Vec<_> = flattened.into_iter().collect(); + // we reorder the field to get all the known field first + flattened + .sort_unstable_by_key(|(key, _)| self.fields_ids_map.id(&key).unwrap_or(FieldId::MAX)); + + for (key, value) in flattened { + let fid = self.fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?; + let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?; + writer.insert(fid, &value)?; + if field_buffer_cache.iter().find(|(id, _)| *id == fid).is_none() { + field_buffer_cache.push((fid, value.into())); + } + } + + Ok(()) + } + /// Generate the `TransformOutput` based on the given sorter that can be generated from any /// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document /// id for the user side and the value must be an obkv where keys are valid fields ids. @@ -268,110 +423,8 @@ impl<'a, 'i> Transform<'a, 'i> { .primary_key(&wtxn)? .ok_or(Error::UserError(UserError::MissingPrimaryKey))? .to_string(); - let fields_ids_map = self.index.fields_ids_map(wtxn)?; - let approximate_number_of_documents = self.documents_count; - let mut external_documents_ids = self.index.external_documents_ids(wtxn).unwrap(); - let documents_ids = self.index.documents_ids(wtxn)?; - let mut field_distribution = self.index.field_distribution(wtxn)?; - let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); - - // consume sorter, in order to free the internal allocation, before creating a new one. - let mut iter = self.sorter.into_stream_merger_iter()?; - - // Once we have sort and deduplicated the documents we write them into a final file. - let mut final_sorter = create_sorter( - |_id, obkvs| { - if obkvs.len() == 1 { - Ok(obkvs[0].clone()) - } else { - Err(InternalError::IndexingMergingKeys { process: "documents" }.into()) - } - }, - self.indexer_settings.chunk_compression_type, - self.indexer_settings.chunk_compression_level, - self.indexer_settings.max_nb_chunks, - self.indexer_settings.max_memory, - ); - let mut new_external_documents_ids_builder = fst::MapBuilder::memory(); - let mut replaced_documents_ids = RoaringBitmap::new(); - let mut new_documents_ids = RoaringBitmap::new(); - let mut obkv_buffer = Vec::new(); - - // While we write into final file we get or generate the internal documents ids. - let mut documents_count = 0; - while let Some((external_id, update_obkv)) = iter.next()? { - if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) { - progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments { - documents_seen: documents_count, - total_documents: approximate_number_of_documents, - }); - } - - let (docid, obkv) = match external_documents_ids.get(external_id) { - Some(docid) => { - // If we find the user id in the current external documents ids map - // we use it and insert it in the list of replaced documents. - replaced_documents_ids.insert(docid); - - let key = BEU32::new(docid); - let base_obkv = self.index.documents.get(wtxn, &key)?.ok_or( - InternalError::DatabaseMissingEntry { - db_name: db_name::DOCUMENTS, - key: None, - }, - )?; - - // we remove all the fields that were already counted - for (field_id, _) in base_obkv.iter() { - let field_name = fields_ids_map.name(field_id).unwrap(); - if let Entry::Occupied(mut entry) = - field_distribution.entry(field_name.to_string()) - { - match entry.get().checked_sub(1) { - Some(0) | None => entry.remove(), - Some(count) => entry.insert(count), - }; - } - } - - // Depending on the update indexing method we will merge - // the document update with the current document or not. - match self.index_documents_method { - IndexDocumentsMethod::ReplaceDocuments => (docid, update_obkv), - IndexDocumentsMethod::UpdateDocuments => { - let update_obkv = obkv::KvReader::new(update_obkv); - merge_two_obkvs(base_obkv, update_obkv, &mut obkv_buffer); - (docid, obkv_buffer.as_slice()) - } - } - } - None => { - // If this user id is new we add it to the external documents ids map - // for new ids and into the list of new documents. - let new_docid = - available_documents_ids.next().ok_or(UserError::DocumentLimitReached)?; - new_external_documents_ids_builder.insert(external_id, new_docid as u64)?; - new_documents_ids.insert(new_docid); - (new_docid, update_obkv) - } - }; - - // We insert the document under the documents ids map into the final file. - final_sorter.insert(docid.to_be_bytes(), obkv)?; - documents_count += 1; - - let reader = obkv::KvReader::new(obkv); - for (field_id, _) in reader.iter() { - let field_name = fields_ids_map.name(field_id).unwrap(); - *field_distribution.entry(field_name.to_string()).or_default() += 1; - } - } - - progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments { - documents_seen: documents_count, - total_documents: documents_count, - }); + let mut external_documents_ids = self.index.external_documents_ids(wtxn)?; // We create a final writer to write the new documents in order from the sorter. let mut writer = create_writer( @@ -380,28 +433,103 @@ impl<'a, 'i> Transform<'a, 'i> { tempfile::tempfile()?, ); + // Once we have all the documents in the sorter, we write the documents + // in the writer. We also generate the field distribution. + let mut field_distribution = self.index.field_distribution(wtxn)?; + let mut iter = self.original_sorter.into_stream_merger_iter()?; + // used only for the callback + let mut documents_count = 0; + + while let Some((key, val)) = iter.next()? { + // send a callback to show at which step we are + documents_count += 1; + progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments { + documents_seen: documents_count, + total_documents: self.documents_count, + }); + + let u32_key = key.clone().read_u32::()?; + // if the document was already in the db we remove all of its field + // from the field distribution. + if self.replaced_documents_ids.contains(u32_key) { + let obkv = self.index.documents.get(wtxn, &BEU32::new(u32_key))?.ok_or( + InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, + )?; + + for (key, _) in obkv.iter() { + let name = + self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId { + field_id: key, + process: "Computing field distribution in transform.", + })?; + // We checked that the document was in the db earlier. If we can't find it it means + // there is an inconsistency between the field distribution and the field id map. + let field = field_distribution.get_mut(name).ok_or( + FieldIdMapMissingEntry::FieldId { + field_id: key, + process: "Accessing field distribution in transform.", + }, + )?; + *field -= 1; + if *field == 0 { + // since we were able to get the field right before it's safe to unwrap here + field_distribution.remove(name).unwrap(); + } + } + } + + // We increment all the field of the current document in the field distribution. + let obkv = KvReader::new(val); + + for (key, _) in obkv.iter() { + let name = + self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId { + field_id: key, + process: "Computing field distribution in transform.", + })?; + *field_distribution.entry(name.to_string()).or_insert(0) += 1; + } + writer.insert(key, val)?; + } + + let mut original_documents = writer.into_inner()?; + // We then extract the file and reset the seek to be able to read it again. + original_documents.seek(SeekFrom::Start(0))?; + + // We create a final writer to write the new documents in order from the sorter. + let mut writer = create_writer( + self.indexer_settings.chunk_compression_type, + self.indexer_settings.chunk_compression_level, + tempfile::tempfile()?, + ); // Once we have written all the documents into the final sorter, we write the documents // into this writer, extract the file and reset the seek to be able to read it again. - final_sorter.write_into_stream_writer(&mut writer)?; - let mut documents_file = writer.into_inner()?; - documents_file.seek(SeekFrom::Start(0))?; + self.flattened_sorter.write_into_stream_writer(&mut writer)?; + let mut flattened_documents = writer.into_inner()?; + flattened_documents.seek(SeekFrom::Start(0))?; - let before_docids_merging = Instant::now(); - // We merge the new external ids with existing external documents ids. - let new_external_documents_ids = new_external_documents_ids_builder.into_map(); + let mut new_external_documents_ids_builder: Vec<_> = + self.new_external_documents_ids_builder.into_iter().collect(); + + new_external_documents_ids_builder + .sort_unstable_by(|(left, _), (right, _)| left.cmp(&right)); + let mut fst_new_external_documents_ids_builder = fst::MapBuilder::memory(); + new_external_documents_ids_builder.into_iter().try_for_each(|(key, value)| { + fst_new_external_documents_ids_builder.insert(key, value) + })?; + let new_external_documents_ids = fst_new_external_documents_ids_builder.into_map(); external_documents_ids.insert_ids(&new_external_documents_ids)?; - info!("Documents external merging took {:.02?}", before_docids_merging.elapsed()); - Ok(TransformOutput { primary_key, - fields_ids_map, + fields_ids_map: self.fields_ids_map, field_distribution, external_documents_ids: external_documents_ids.into_static(), - new_documents_ids, - replaced_documents_ids, - documents_count, - documents_file, + new_documents_ids: self.new_documents_ids, + replaced_documents_ids: self.replaced_documents_ids, + documents_count: self.documents_count, + original_documents, + flattened_documents, }) } @@ -412,7 +540,7 @@ impl<'a, 'i> Transform<'a, 'i> { self, wtxn: &mut heed::RwTxn, old_fields_ids_map: FieldsIdsMap, - new_fields_ids_map: FieldsIdsMap, + mut new_fields_ids_map: FieldsIdsMap, ) -> Result { // There already has been a document addition, the primary key should be set by now. let primary_key = @@ -423,7 +551,14 @@ impl<'a, 'i> Transform<'a, 'i> { let documents_count = documents_ids.len() as usize; // We create a final writer to write the new documents in order from the sorter. - let mut writer = create_writer( + let mut original_writer = create_writer( + self.indexer_settings.chunk_compression_type, + self.indexer_settings.chunk_compression_level, + tempfile::tempfile()?, + ); + + // We create a final writer to write the new documents in order from the sorter. + let mut flattened_writer = create_writer( self.indexer_settings.chunk_compression_type, self.indexer_settings.chunk_compression_level, tempfile::tempfile()?, @@ -445,13 +580,51 @@ impl<'a, 'i> Transform<'a, 'i> { } let buffer = obkv_writer.into_inner()?; - writer.insert(docid.to_be_bytes(), buffer)?; + original_writer.insert(docid.to_be_bytes(), &buffer)?; + + // Once we have the document. We're going to flatten it + // and insert it in the flattened sorter. + let mut doc = serde_json::Map::new(); + + let reader = obkv::KvReader::new(buffer); + for (k, v) in reader.iter() { + let key = new_fields_ids_map.name(k).ok_or(FieldIdMapMissingEntry::FieldId { + field_id: k, + process: "Accessing field distribution in transform.", + })?; + let value = serde_json::from_slice::(v) + .map_err(InternalError::SerdeJson)?; + doc.insert(key.to_string(), value); + } + + let flattened = flatten_serde_json::flatten(&doc); + + // Once we have the flattened version we can convert it back to obkv and + // insert all the new generated fields_ids (if any) in the fields ids map. + let mut buffer: Vec = Vec::new(); + let mut writer = KvWriter::new(&mut buffer); + let mut flattened: Vec<_> = flattened.into_iter().collect(); + // we reorder the field to get all the known field first + flattened.sort_unstable_by_key(|(key, _)| { + new_fields_ids_map.id(&key).unwrap_or(FieldId::MAX) + }); + + for (key, value) in flattened { + let fid = + new_fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?; + let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?; + writer.insert(fid, &value)?; + } + flattened_writer.insert(docid.to_be_bytes(), &buffer)?; } // Once we have written all the documents, we extract // the file and reset the seek to be able to read it again. - let mut documents_file = writer.into_inner()?; - documents_file.seek(SeekFrom::Start(0))?; + let mut original_documents = original_writer.into_inner()?; + original_documents.seek(SeekFrom::Start(0))?; + + let mut flattened_documents = flattened_writer.into_inner()?; + flattened_documents.seek(SeekFrom::Start(0))?; Ok(TransformOutput { primary_key, @@ -461,7 +634,8 @@ impl<'a, 'i> Transform<'a, 'i> { new_documents_ids: documents_ids, replaced_documents_ids: RoaringBitmap::default(), documents_count, - documents_file, + original_documents, + flattened_documents, }) } } @@ -521,11 +695,84 @@ fn drop_and_reuse(mut vec: Vec) -> Vec { vec.into_iter().map(|_| unreachable!()).collect() } +fn update_primary_key<'a>( + document: KvReader<'a, FieldId>, + addition_index: &DocumentsBatchIndex, + primary_key_id: FieldId, + primary_key_name: &str, + uuid_buffer: &'a mut [u8; uuid::adapter::Hyphenated::LENGTH], + field_buffer_cache: &mut Vec<(u16, Cow<'a, [u8]>)>, + mut external_id_buffer: &'a mut Vec, + autogenerate_docids: bool, +) -> Result> { + match field_buffer_cache.iter_mut().find(|(id, _)| *id == primary_key_id) { + Some((_, bytes)) => { + let value = match serde_json::from_slice(bytes).map_err(InternalError::SerdeJson)? { + Value::String(string) => match validate_document_id(&string) { + Some(s) if s.len() == string.len() => string, + Some(s) => s.to_string(), + None => { + return Err(UserError::InvalidDocumentId { + document_id: Value::String(string), + } + .into()) + } + }, + Value::Number(number) => number.to_string(), + content => { + return Err(UserError::InvalidDocumentId { document_id: content.clone() }.into()) + } + }; + serde_json::to_writer(external_id_buffer, &value).map_err(InternalError::SerdeJson)?; + Ok(Cow::Owned(value)) + } + None if autogenerate_docids => { + let uuid = uuid::Uuid::new_v4().to_hyphenated().encode_lower(uuid_buffer); + serde_json::to_writer(&mut external_id_buffer, &uuid) + .map_err(InternalError::SerdeJson)?; + field_buffer_cache.push((primary_key_id, external_id_buffer.as_slice().into())); + Ok(Cow::Borrowed(&*uuid)) + } + None => { + let mut json = Map::new(); + for (key, value) in document.iter() { + let key = addition_index.name(key).cloned(); + let value = serde_json::from_slice::(&value).ok(); + + if let Some((k, v)) = key.zip(value) { + json.insert(k, v); + } + } + + Err(UserError::MissingDocumentId { + primary_key: primary_key_name.to_string(), + document: json, + })? + } + } +} + +impl TransformOutput { + // find and insert the new field ids + pub fn compute_real_facets(&self, rtxn: &RoTxn, index: &Index) -> Result> { + let user_defined_facets = index.user_defined_faceted_fields(rtxn)?; + + Ok(self + .fields_ids_map + .names() + .filter(|&field| crate::is_faceted(field, &user_defined_facets)) + .map(|field| field.to_string()) + .collect()) + } +} + #[cfg(test)] mod test { use super::*; mod compute_primary_key { + use big_s::S; + use super::{compute_primary_key_pair, FieldsIdsMap}; #[test] @@ -540,6 +787,18 @@ mod test { ); assert_eq!(result.unwrap(), (0, "toto".to_string())); assert_eq!(fields_map.len(), 1); + + // and with nested fields + let mut fields_map = FieldsIdsMap::new(); + fields_map.insert("toto.tata").unwrap(); + let result = compute_primary_key_pair( + Some("toto.tata"), + &mut fields_map, + Some(S("titi")), + false, + ); + assert_eq!(result.unwrap(), (0, "toto.tata".to_string())); + assert_eq!(fields_map.len(), 1); } #[test] @@ -547,7 +806,7 @@ mod test { let mut fields_map = FieldsIdsMap::new(); let result = compute_primary_key_pair(None, &mut fields_map, Some("tata".to_string()), false); - assert_eq!(result.unwrap(), (0, "tata".to_string())); + assert_eq!(result.unwrap(), (0, S("tata"))); assert_eq!(fields_map.len(), 1); } @@ -555,7 +814,7 @@ mod test { fn should_return_default_if_both_are_none() { let mut fields_map = FieldsIdsMap::new(); let result = compute_primary_key_pair(None, &mut fields_map, None, true); - assert_eq!(result.unwrap(), (0, "id".to_string())); + assert_eq!(result.unwrap(), (0, S("id"))); assert_eq!(fields_map.len(), 1); } @@ -569,6 +828,7 @@ mod test { } mod primary_key_inference { + use big_s::S; use bimap::BiHashMap; use crate::documents::DocumentsBatchIndex; @@ -579,11 +839,11 @@ mod test { // We run the test multiple times to change the order in which the fields are iterated upon. for _ in 1..50 { let mut map = BiHashMap::new(); - map.insert(1, "fakeId".to_string()); - map.insert(2, "fakeId".to_string()); - map.insert(3, "fakeId".to_string()); - map.insert(4, "fakeId".to_string()); - map.insert(0, "realId".to_string()); + map.insert(1, S("fakeId")); + map.insert(2, S("fakeId")); + map.insert(3, S("fakeId")); + map.insert(4, S("fakeId")); + map.insert(0, S("realId")); assert_eq!(find_primary_key(&DocumentsBatchIndex(map)), Some("realId")); } diff --git a/milli/src/update/settings.rs b/milli/src/update/settings.rs index 7a26361d4..7dd37ccc2 100644 --- a/milli/src/update/settings.rs +++ b/milli/src/update/settings.rs @@ -249,11 +249,12 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { } let transform = Transform::new( + self.wtxn, &self.index, &self.indexer_config, IndexDocumentsMethod::ReplaceDocuments, false, - ); + )?; // We remap the documents fields based on the new `FieldsIdsMap`. let output = transform.remap_index_documents( @@ -262,6 +263,9 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { fields_ids_map.clone(), )?; + let new_facets = output.compute_real_facets(self.wtxn, self.index)?; + self.index.put_faceted_fields(self.wtxn, &new_facets)?; + // We clear the full database (words-fst, documents ids and documents content). ClearDocuments::new(self.wtxn, self.index).execute()?; @@ -273,7 +277,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { &self.indexer_config, IndexDocumentsConfig::default(), &cb, - ); + )?; indexing_builder.execute_raw(output)?; Ok(()) @@ -583,7 +587,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { { self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?; - let old_faceted_fields = self.index.faceted_fields(&self.wtxn)?; + let old_faceted_fields = self.index.user_defined_faceted_fields(&self.wtxn)?; let old_fields_ids_map = self.index.fields_ids_map(&self.wtxn)?; self.update_displayed()?; @@ -599,7 +603,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { // If there is new faceted fields we indicate that we must reindex as we must // index new fields as facets. It means that the distinct attribute, // an Asc/Desc criterion or a filtered attribute as be added or removed. - let new_faceted_fields = self.index.faceted_fields(&self.wtxn)?; + let new_faceted_fields = self.index.user_defined_faceted_fields(&self.wtxn)?; let faceted_updated = old_faceted_fields != new_faceted_fields; let stop_words_updated = self.update_stop_words()?; @@ -651,7 +655,8 @@ mod tests { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -713,7 +718,8 @@ mod tests { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -764,7 +770,8 @@ mod tests { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -793,7 +800,8 @@ mod tests { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); @@ -846,7 +854,8 @@ mod tests { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -858,7 +867,6 @@ mod tests { // Only count the field_id 0 and level 0 facet values. // TODO we must support typed CSVs for numbers to be understood. let fidmap = index.fields_ids_map(&rtxn).unwrap(); - println!("fidmap: {:?}", fidmap); for document in index.all_documents(&rtxn).unwrap() { let document = document.unwrap(); let json = crate::obkv_to_json(&fidmap.ids().collect::>(), &fidmap, document.1) @@ -886,7 +894,8 @@ mod tests { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -927,7 +936,8 @@ mod tests { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -977,7 +987,51 @@ mod tests { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); + wtxn.commit().unwrap(); + + // Run an empty query just to ensure that the search results are ordered. + let rtxn = index.read_txn().unwrap(); + let SearchResult { documents_ids, .. } = index.search(&rtxn).execute().unwrap(); + + // There must be at least one document with a 34 as the age. + assert_eq!(documents_ids.len(), 3); + } + + #[test] + fn set_nested_distinct_field() { + let path = tempfile::tempdir().unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(10 * 1024 * 1024); // 10 MB + let index = Index::new(options, &path).unwrap(); + let config = IndexerConfig::default(); + + // Set the filterable fields to be the age. + let mut wtxn = index.write_txn().unwrap(); + let mut builder = Settings::new(&mut wtxn, &index, &config); + // Don't display the generated `id` field. + builder.set_displayed_fields(vec![S("person")]); + builder.set_distinct_field(S("person.age")); + builder.execute(|_| ()).unwrap(); + + // Then index some documents. + let content = documents!([ + { "person": { "name": "kevin", "age": 23 }}, + { "person": { "name": "kevina", "age": 21 }}, + { "person": { "name": "benoit", "age": 34 }}, + { "person": { "name": "bernard", "age": 34 }}, + { "person": { "name": "bertrand", "age": 34 }}, + { "person": { "name": "bernie", "age": 34 }}, + { "person": { "name": "ben", "age": 34 }} + ]); + let indexing_config = + IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -1008,7 +1062,8 @@ mod tests { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -1037,7 +1092,8 @@ mod tests { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); @@ -1115,7 +1171,8 @@ mod tests { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); @@ -1252,7 +1309,8 @@ mod tests { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -1314,7 +1372,8 @@ mod tests { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()) + .unwrap(); builder.add_documents(content).unwrap(); builder.execute().unwrap(); wtxn.commit().unwrap(); diff --git a/milli/tests/search/mod.rs b/milli/tests/search/mod.rs index 52b4c7114..c72ca8ba3 100644 --- a/milli/tests/search/mod.rs +++ b/milli/tests/search/mod.rs @@ -59,7 +59,8 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { let config = IndexerConfig { max_memory: Some(10 * 1024 * 1024), ..Default::default() }; let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); let mut cursor = Cursor::new(Vec::new()); let mut documents_builder = DocumentBatchBuilder::new(&mut cursor).unwrap(); let reader = Cursor::new(CONTENT.as_bytes()); diff --git a/milli/tests/search/query_criteria.rs b/milli/tests/search/query_criteria.rs index 786fdbcae..893d7c30a 100644 --- a/milli/tests/search/query_criteria.rs +++ b/milli/tests/search/query_criteria.rs @@ -390,7 +390,8 @@ fn criteria_ascdesc() { // index documents let config = IndexerConfig { max_memory: Some(10 * 1024 * 1024), ..Default::default() }; let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; - let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); let mut cursor = Cursor::new(Vec::new()); let mut batch_builder = DocumentBatchBuilder::new(&mut cursor).unwrap();