diff --git a/infos/src/main.rs b/infos/src/main.rs index e874385e6..3f41b7d42 100644 --- a/infos/src/main.rs +++ b/infos/src/main.rs @@ -16,23 +16,29 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; const MAIN_DB_NAME: &str = "main"; const WORD_DOCIDS_DB_NAME: &str = "word-docids"; +const WORD_PREFIX_DOCIDS_DB_NAME: &str = "word-prefix-docids"; const DOCID_WORD_POSITIONS_DB_NAME: &str = "docid-word-positions"; const WORD_PAIR_PROXIMITY_DOCIDS_DB_NAME: &str = "word-pair-proximity-docids"; +const WORD_PREFIX_PAIR_PROXIMITY_DOCIDS_DB_NAME: &str = "word-prefix-pair-proximity-docids"; const DOCUMENTS_DB_NAME: &str = "documents"; const USERS_IDS_DOCUMENTS_IDS: &[u8] = b"users-ids-documents-ids"; const ALL_DATABASE_NAMES: &[&str] = &[ MAIN_DB_NAME, WORD_DOCIDS_DB_NAME, + WORD_PREFIX_DOCIDS_DB_NAME, DOCID_WORD_POSITIONS_DB_NAME, WORD_PAIR_PROXIMITY_DOCIDS_DB_NAME, + WORD_PREFIX_PAIR_PROXIMITY_DOCIDS_DB_NAME, DOCUMENTS_DB_NAME, ]; const POSTINGS_DATABASE_NAMES: &[&str] = &[ WORD_DOCIDS_DB_NAME, + WORD_PREFIX_DOCIDS_DB_NAME, DOCID_WORD_POSITIONS_DB_NAME, WORD_PAIR_PROXIMITY_DOCIDS_DB_NAME, + WORD_PREFIX_PAIR_PROXIMITY_DOCIDS_DB_NAME, ]; #[derive(Debug, StructOpt)] @@ -85,6 +91,16 @@ enum Command { words: Vec, }, + /// Outputs a CSV with the documents ids where the given words prefixes appears. + WordsPrefixesDocids { + /// Display the whole documents ids in details. + #[structopt(long)] + full_display: bool, + + /// The prefixes to display the documents ids of. + prefixes: Vec, + }, + /// Outputs a CSV with the documents ids along with the facet values where it appears. FacetValuesDocids { /// Display the whole documents ids in details. @@ -147,6 +163,12 @@ enum Command { /// you can install it using `cargo install fst-bin`. ExportWordsFst, + /// Outputs the words prefix FST to standard output. + /// + /// One can use the FST binary helper to dissect and analyze it, + /// you can install it using `cargo install fst-bin`. + ExportWordsPrefixFst, + /// Outputs the documents as JSON lines to the standard output. /// /// All of the fields are extracted, not just the displayed ones. @@ -186,6 +208,9 @@ fn run(opt: Opt) -> anyhow::Result<()> { MostCommonWords { limit } => most_common_words(&index, &rtxn, limit), BiggestValues { limit } => biggest_value_sizes(&index, &rtxn, limit), WordsDocids { full_display, words } => words_docids(&index, &rtxn, !full_display, words), + WordsPrefixesDocids { full_display, prefixes } => { + words_prefixes_docids(&index, &rtxn, !full_display, prefixes) + }, FacetValuesDocids { full_display, field_name } => { facet_values_docids(&index, &rtxn, !full_display, field_name) }, @@ -201,6 +226,7 @@ fn run(opt: Opt) -> anyhow::Result<()> { word_pair_proximities_docids(&index, &rtxn, !full_display, word1, word2) }, ExportWordsFst => export_words_fst(&index, &rtxn), + ExportWordsPrefixFst => export_words_prefix_fst(&index, &rtxn), ExportDocuments => export_documents(&index, &rtxn), PatchToNewExternalIds => { drop(rtxn); @@ -311,8 +337,10 @@ fn biggest_value_sizes(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyho env: _env, main, word_docids, + word_prefix_docids, docid_word_positions, word_pair_proximity_docids, + word_prefix_pair_proximity_docids, facet_field_id_value_docids, field_id_docid_facet_values: _, documents, @@ -320,7 +348,9 @@ fn biggest_value_sizes(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyho let main_name = "main"; let word_docids_name = "word_docids"; + let word_prefix_docids_name = "word_prefix_docids"; let docid_word_positions_name = "docid_word_positions"; + let word_prefix_pair_proximity_docids_name = "word_prefix_pair_proximity_docids"; let word_pair_proximity_docids_name = "word_pair_proximity_docids"; let facet_field_id_value_docids_name = "facet_field_id_value_docids"; let documents_name = "documents"; @@ -328,8 +358,16 @@ fn biggest_value_sizes(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyho let mut heap = BinaryHeap::with_capacity(limit + 1); if limit > 0 { + // Fetch the words FST let words_fst = index.words_fst(rtxn)?; - heap.push(Reverse((words_fst.as_fst().as_bytes().len(), format!("words-fst"), main_name))); + let length = words_fst.as_fst().as_bytes().len(); + heap.push(Reverse((length, format!("words-fst"), main_name))); + if heap.len() > limit { heap.pop(); } + + // Fetch the word prefix FST + let words_prefixes_fst = index.words_prefixes_fst(rtxn)?; + let length = words_prefixes_fst.as_fst().as_bytes().len(); + heap.push(Reverse((length, format!("words-prefixes-fst"), main_name))); if heap.len() > limit { heap.pop(); } if let Some(documents_ids) = main.get::<_, Str, ByteSlice>(rtxn, "documents-ids")? { @@ -343,6 +381,12 @@ fn biggest_value_sizes(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyho if heap.len() > limit { heap.pop(); } } + for result in word_prefix_docids.remap_data_type::().iter(rtxn)? { + let (word, value) = result?; + heap.push(Reverse((value.len(), word.to_string(), word_prefix_docids_name))); + if heap.len() > limit { heap.pop(); } + } + for result in docid_word_positions.remap_data_type::().iter(rtxn)? { let ((docid, word), value) = result?; let key = format!("{} {}", docid, word); @@ -357,6 +401,13 @@ fn biggest_value_sizes(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyho if heap.len() > limit { heap.pop(); } } + for result in word_prefix_pair_proximity_docids.remap_data_type::().iter(rtxn)? { + let ((word, prefix, prox), value) = result?; + let key = format!("{} {} {}", word, prefix, prox); + heap.push(Reverse((value.len(), key, word_prefix_pair_proximity_docids_name))); + if heap.len() > limit { heap.pop(); } + } + let faceted_fields = index.faceted_fields_ids(rtxn)?; let fields_ids_map = index.fields_ids_map(rtxn)?; for (field_id, field_type) in faceted_fields { @@ -426,6 +477,43 @@ fn words_docids(index: &Index, rtxn: &heed::RoTxn, debug: bool, words: Vec, +) -> anyhow::Result<()> +{ + let stdout = io::stdout(); + let mut wtr = csv::Writer::from_writer(stdout.lock()); + wtr.write_record(&["prefix", "documents_ids"])?; + + if prefixes.is_empty() { + for result in index.word_prefix_docids.iter(rtxn)? { + let (prefix, docids) = result?; + let docids = if debug { + format!("{:?}", docids) + } else { + format!("{:?}", docids.iter().collect::>()) + }; + wtr.write_record(&[prefix, &docids])?; + } + } else { + for prefix in prefixes { + if let Some(docids) = index.word_prefix_docids.get(rtxn, &prefix)? { + let docids = if debug { + format!("{:?}", docids) + } else { + format!("{:?}", docids.iter().collect::>()) + }; + wtr.write_record(&[prefix, docids])?; + } + } + } + + Ok(wtr.flush()?) +} + fn facet_values_docids(index: &Index, rtxn: &heed::RoTxn, debug: bool, field_name: String) -> anyhow::Result<()> { let fields_ids_map = index.fields_ids_map(&rtxn)?; let faceted_fields = index.faceted_fields_ids(&rtxn)?; @@ -517,6 +605,16 @@ fn export_words_fst(index: &Index, rtxn: &heed::RoTxn) -> anyhow::Result<()> { Ok(()) } +fn export_words_prefix_fst(index: &Index, rtxn: &heed::RoTxn) -> anyhow::Result<()> { + use std::io::Write as _; + + let mut stdout = io::stdout(); + let words_prefixes_fst = index.words_prefixes_fst(rtxn)?; + stdout.write_all(words_prefixes_fst.as_fst().as_bytes())?; + + Ok(()) +} + fn export_documents(index: &Index, rtxn: &heed::RoTxn) -> anyhow::Result<()> { use std::io::{BufWriter, Write as _}; use milli::obkv_to_json; @@ -627,9 +725,11 @@ fn size_of_database(index: &Index, rtxn: &heed::RoTxn, name: &str) -> anyhow::Re let database = match name { MAIN_DB_NAME => &index.main, + WORD_PREFIX_DOCIDS_DB_NAME => index.word_prefix_docids.as_polymorph(), WORD_DOCIDS_DB_NAME => index.word_docids.as_polymorph(), DOCID_WORD_POSITIONS_DB_NAME => index.docid_word_positions.as_polymorph(), WORD_PAIR_PROXIMITY_DOCIDS_DB_NAME => index.word_pair_proximity_docids.as_polymorph(), + WORD_PREFIX_PAIR_PROXIMITY_DOCIDS_DB_NAME => index.word_prefix_pair_proximity_docids.as_polymorph(), DOCUMENTS_DB_NAME => index.documents.as_polymorph(), unknown => anyhow::bail!("unknown database {:?}", unknown), }; @@ -675,24 +775,21 @@ fn database_stats(index: &Index, rtxn: &heed::RoTxn, name: &str) -> anyhow::Resu } values_length.sort_unstable(); + let len = values_length.len(); - let median = values_length.len() / 2; - let quartile = values_length.len() / 4; - let percentile = values_length.len() / 100; - - let twenty_five_percentile = values_length.get(quartile).unwrap_or(&0); - let fifty_percentile = values_length.get(median).unwrap_or(&0); - let seventy_five_percentile = values_length.get(quartile * 3).unwrap_or(&0); - let ninety_percentile = values_length.get(percentile * 90).unwrap_or(&0); - let ninety_five_percentile = values_length.get(percentile * 95).unwrap_or(&0); - let ninety_nine_percentile = values_length.get(percentile * 99).unwrap_or(&0); + let twenty_five_percentile = values_length.get(len / 4).unwrap_or(&0); + let fifty_percentile = values_length.get(len / 2).unwrap_or(&0); + let seventy_five_percentile = values_length.get(len * 3 / 4).unwrap_or(&0); + let ninety_percentile = values_length.get(len * 90 / 100).unwrap_or(&0); + let ninety_five_percentile = values_length.get(len * 95 / 100).unwrap_or(&0); + let ninety_nine_percentile = values_length.get(len * 99 / 100).unwrap_or(&0); let minimum = values_length.first().unwrap_or(&0); let maximum = values_length.last().unwrap_or(&0); let count = values_length.len(); let sum = values_length.iter().map(|l| *l as u64).sum::(); println!("The {} database stats on the lengths", name); - println!("\tnumber of proximity pairs: {}", count); + println!("\tnumber of entries: {}", count); println!("\t25th percentile (first quartile): {}", twenty_five_percentile); println!("\t50th percentile (median): {}", fifty_percentile); println!("\t75th percentile (third quartile): {}", seventy_five_percentile); @@ -714,6 +811,10 @@ fn database_stats(index: &Index, rtxn: &heed::RoTxn, name: &str) -> anyhow::Resu let db = index.word_docids.as_polymorph(); compute_stats::(*db, rtxn, name) }, + WORD_PREFIX_DOCIDS_DB_NAME => { + let db = index.word_prefix_docids.as_polymorph(); + compute_stats::(*db, rtxn, name) + }, DOCID_WORD_POSITIONS_DB_NAME => { let db = index.docid_word_positions.as_polymorph(); compute_stats::(*db, rtxn, name) @@ -722,6 +823,10 @@ fn database_stats(index: &Index, rtxn: &heed::RoTxn, name: &str) -> anyhow::Resu let db = index.word_pair_proximity_docids.as_polymorph(); compute_stats::(*db, rtxn, name) }, + WORD_PREFIX_PAIR_PROXIMITY_DOCIDS_DB_NAME => { + let db = index.word_prefix_pair_proximity_docids.as_polymorph(); + compute_stats::(*db, rtxn, name) + }, unknown => anyhow::bail!("unknown database {:?}", unknown), } } diff --git a/milli/src/index.rs b/milli/src/index.rs index c0dd22986..12ad86b22 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -27,6 +27,7 @@ pub const SEARCHABLE_FIELDS_KEY: &str = "searchable-fields"; pub const HARD_EXTERNAL_DOCUMENTS_IDS_KEY: &str = "hard-external-documents-ids"; pub const SOFT_EXTERNAL_DOCUMENTS_IDS_KEY: &str = "soft-external-documents-ids"; pub const WORDS_FST_KEY: &str = "words-fst"; +pub const WORDS_PREFIXES_FST_KEY: &str = "words-prefixes-fst"; #[derive(Clone)] pub struct Index { @@ -36,10 +37,14 @@ pub struct Index { pub main: PolyDatabase, /// A word and all the documents ids containing the word. pub word_docids: Database, + /// A prefix of word and all the documents ids containing this prefix. + pub word_prefix_docids: Database, /// Maps a word and a document id (u32) to all the positions where the given word appears. pub docid_word_positions: Database, /// Maps the proximity between a pair of words with all the docids where this relation appears. pub word_pair_proximity_docids: Database, + /// Maps the proximity between a pair of word and prefix with all the docids where this relation appears. + pub word_prefix_pair_proximity_docids: Database, /// Maps the facet field id and the globally ordered value with the docids that corresponds to it. pub facet_field_id_value_docids: Database, /// Maps the document id, the facet field id and the globally ordered value. @@ -50,13 +55,15 @@ pub struct Index { impl Index { pub fn new>(mut options: heed::EnvOpenOptions, path: P) -> anyhow::Result { - options.max_dbs(7); + options.max_dbs(9); let env = options.open(path)?; let main = env.create_poly_database(Some("main"))?; let word_docids = env.create_database(Some("word-docids"))?; + let word_prefix_docids = env.create_database(Some("word-prefix-docids"))?; let docid_word_positions = env.create_database(Some("docid-word-positions"))?; let word_pair_proximity_docids = env.create_database(Some("word-pair-proximity-docids"))?; + let word_prefix_pair_proximity_docids = env.create_database(Some("word-prefix-pair-proximity-docids"))?; let facet_field_id_value_docids = env.create_database(Some("facet-field-id-value-docids"))?; let field_id_docid_facet_values = env.create_database(Some("field-id-docid-facet-values"))?; let documents = env.create_database(Some("documents"))?; @@ -65,8 +72,10 @@ impl Index { env, main, word_docids, + word_prefix_docids, docid_word_positions, word_pair_proximity_docids, + word_prefix_pair_proximity_docids, facet_field_id_value_docids, field_id_docid_facet_values, documents, @@ -328,6 +337,23 @@ impl Index { } } + /* words prefixes fst */ + + /// Writes the FST which is the words prefixes dictionnary of the engine. + pub fn put_words_prefixes_fst>(&self, wtxn: &mut RwTxn, fst: &fst::Set) -> heed::Result<()> { + self.main.put::<_, Str, ByteSlice>(wtxn, WORDS_PREFIXES_FST_KEY, fst.as_fst().as_bytes()) + } + + /// Returns the FST which is the words prefixes dictionnary of the engine. + pub fn words_prefixes_fst<'t>(&self, rtxn: &'t RoTxn) -> anyhow::Result>> { + match self.main.get::<_, Str, ByteSlice>(rtxn, WORDS_PREFIXES_FST_KEY)? { + Some(bytes) => Ok(fst::Set::new(bytes)?.map_data(Cow::Borrowed)?), + None => Ok(fst::Set::default().map_data(Cow::Owned)?), + } + } + + /* documents */ + /// Returns a [`Vec`] of the requested documents. Returns an error if a document is missing. pub fn documents<'t>( &self, diff --git a/milli/src/lib.rs b/milli/src/lib.rs index 7a9afde2d..66d134f4e 100644 --- a/milli/src/lib.rs +++ b/milli/src/lib.rs @@ -33,8 +33,8 @@ pub use self::update_store::UpdateStore; pub type FastMap4 = HashMap>; pub type FastMap8 = HashMap>; pub type SmallString32 = smallstr::SmallString<[u8; 32]>; -pub type SmallVec32 = smallvec::SmallVec<[T; 32]>; pub type SmallVec16 = smallvec::SmallVec<[T; 16]>; +pub type SmallVec32 = smallvec::SmallVec<[T; 32]>; pub type SmallVec8 = smallvec::SmallVec<[T; 8]>; pub type BEU32 = heed::zerocopy::U32; pub type BEU64 = heed::zerocopy::U64; diff --git a/milli/src/update/clear_documents.rs b/milli/src/update/clear_documents.rs index a84596901..1523a95b2 100644 --- a/milli/src/update/clear_documents.rs +++ b/milli/src/update/clear_documents.rs @@ -22,8 +22,10 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> { env: _env, main: _main, word_docids, + word_prefix_docids, docid_word_positions, word_pair_proximity_docids, + word_prefix_pair_proximity_docids, facet_field_id_value_docids, field_id_docid_facet_values, documents, @@ -35,6 +37,7 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> { // We clean some of the main engine datastructures. self.index.put_words_fst(self.wtxn, &fst::Set::default())?; + self.index.put_words_prefixes_fst(self.wtxn, &fst::Set::default())?; self.index.put_external_documents_ids(self.wtxn, &ExternalDocumentsIds::default())?; self.index.put_documents_ids(self.wtxn, &RoaringBitmap::default())?; @@ -45,8 +48,10 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> { // Clear the other databases. word_docids.clear(self.wtxn)?; + word_prefix_docids.clear(self.wtxn)?; docid_word_positions.clear(self.wtxn)?; word_pair_proximity_docids.clear(self.wtxn)?; + word_prefix_pair_proximity_docids.clear(self.wtxn)?; facet_field_id_value_docids.clear(self.wtxn)?; field_id_docid_facet_values.clear(self.wtxn)?; documents.clear(self.wtxn)?; diff --git a/milli/src/update/delete_documents.rs b/milli/src/update/delete_documents.rs index 932589dd7..5430bb3af 100644 --- a/milli/src/update/delete_documents.rs +++ b/milli/src/update/delete_documents.rs @@ -79,8 +79,10 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { env: _env, main: _main, word_docids, + word_prefix_docids, docid_word_positions, word_pair_proximity_docids, + word_prefix_pair_proximity_docids, facet_field_id_value_docids, field_id_docid_facet_values, documents, @@ -179,6 +181,61 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { // We write the new words FST into the main database. self.index.put_words_fst(self.wtxn, &new_words_fst)?; + // We iterate over the word prefix docids database and remove the deleted documents ids + // from every docids lists. We register the empty prefixes in an fst Set for futur deletion. + let mut prefixes_to_delete = fst::SetBuilder::memory(); + let mut iter = word_prefix_docids.iter_mut(self.wtxn)?; + while let Some(result) = iter.next() { + let (prefix, mut docids) = result?; + let previous_len = docids.len(); + docids.difference_with(&self.documents_ids); + if docids.is_empty() { + iter.del_current()?; + prefixes_to_delete.insert(prefix)?; + } else if docids.len() != previous_len { + iter.put_current(prefix, &docids)?; + } + } + + drop(iter); + + // We compute the new prefix FST and write it only if there is a change. + let prefixes_to_delete = prefixes_to_delete.into_set(); + if !prefixes_to_delete.is_empty() { + let new_words_prefixes_fst = { + // We retrieve the current words prefixes FST from the database. + let words_prefixes_fst = self.index.words_prefixes_fst(self.wtxn)?; + let difference = words_prefixes_fst.op().add(&prefixes_to_delete).difference(); + + // We stream the new external ids that does no more contains the to-delete external ids. + let mut new_words_prefixes_fst_builder = fst::SetBuilder::memory(); + new_words_prefixes_fst_builder.extend_stream(difference.into_stream())?; + + // We create an words FST set from the above builder. + new_words_prefixes_fst_builder.into_set() + }; + + // We write the new words prefixes FST into the main database. + self.index.put_words_prefixes_fst(self.wtxn, &new_words_prefixes_fst)?; + } + + // We delete the documents ids from the word prefix pair proximity database docids + // and remove the empty pairs too. + let db = word_prefix_pair_proximity_docids.remap_key_type::(); + let mut iter = db.iter_mut(self.wtxn)?; + while let Some(result) = iter.next() { + let (key, mut docids) = result?; + let previous_len = docids.len(); + docids.difference_with(&self.documents_ids); + if docids.is_empty() { + iter.del_current()?; + } else if docids.len() != previous_len { + iter.put_current(key, &docids)?; + } + } + + drop(iter); + // We delete the documents ids that are under the pairs of words, // it is faster and use no memory to iterate over all the words pairs than // to compute the cartesian product of every words of the deleted documents. diff --git a/milli/src/update/facets.rs b/milli/src/update/facets.rs index 522a4d350..bac5f3c86 100644 --- a/milli/src/update/facets.rs +++ b/milli/src/update/facets.rs @@ -32,7 +32,8 @@ impl<'t, 'u, 'i> Facets<'t, 'u, 'i> { wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index, update_id: u64, - ) -> Facets<'t, 'u, 'i> { + ) -> Facets<'t, 'u, 'i> + { Facets { wtxn, index, diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index e38c640a0..d55f421dc 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -8,7 +8,7 @@ use std::time::Instant; use anyhow::Context; use bstr::ByteSlice as _; -use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; +use grenad::{MergerIter, Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; use heed::types::ByteSlice; use log::{debug, info, error}; use memmap::Mmap; @@ -17,9 +17,9 @@ use rayon::prelude::*; use serde::{Serialize, Deserialize}; use crate::index::Index; -use crate::update::{Facets, UpdateIndexingStep}; +use crate::update::{Facets, WordsPrefixes, UpdateIndexingStep}; use self::store::{Store, Readers}; -use self::merge_function::{ +pub use self::merge_function::{ main_merge, word_docids_merge, words_pairs_proximities_docids_merge, docid_word_positions_merge, documents_merge, facet_field_value_docids_merge, field_id_docid_facet_values_merge, @@ -102,39 +102,19 @@ pub fn merge_into_lmdb_database( sources: Vec>, merge: MergeFn, method: WriteMethod, -) -> anyhow::Result<()> { +) -> anyhow::Result<()> +{ debug!("Merging {} MTBL stores...", sources.len()); let before = Instant::now(); let merger = merge_readers(sources, merge); - let mut in_iter = merger.into_merge_iter()?; - - match method { - WriteMethod::Append => { - let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; - while let Some((k, v)) = in_iter.next()? { - out_iter.append(k, v).with_context(|| { - format!("writing {:?} into LMDB", k.as_bstr()) - })?; - } - }, - WriteMethod::GetMergePut => { - while let Some((k, v)) = in_iter.next()? { - let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; - match iter.next().transpose()? { - Some((key, old_val)) if key == k => { - let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; - let val = merge(k, &vals).expect("merge failed"); - iter.put_current(k, &val)?; - }, - _ => { - drop(iter); - database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?; - }, - } - } - }, - } + merger_iter_into_lmdb_database( + wtxn, + database, + merger.into_merge_iter()?, + merge, + method, + )?; debug!("MTBL stores merged in {:.02?}!", before.elapsed()); Ok(()) @@ -146,7 +126,8 @@ pub fn write_into_lmdb_database( mut reader: Reader, merge: MergeFn, method: WriteMethod, -) -> anyhow::Result<()> { +) -> anyhow::Result<()> +{ debug!("Writing MTBL stores..."); let before = Instant::now(); @@ -181,6 +162,67 @@ pub fn write_into_lmdb_database( Ok(()) } +pub fn sorter_into_lmdb_database( + wtxn: &mut heed::RwTxn, + database: heed::PolyDatabase, + sorter: Sorter, + merge: MergeFn, + method: WriteMethod, +) -> anyhow::Result<()> +{ + debug!("Writing MTBL sorter..."); + let before = Instant::now(); + + merger_iter_into_lmdb_database( + wtxn, + database, + sorter.into_iter()?, + merge, + method, + )?; + + debug!("MTBL sorter writen in {:.02?}!", before.elapsed()); + Ok(()) +} + +fn merger_iter_into_lmdb_database( + wtxn: &mut heed::RwTxn, + database: heed::PolyDatabase, + mut sorter: MergerIter, + merge: MergeFn, + method: WriteMethod, +) -> anyhow::Result<()> +{ + match method { + WriteMethod::Append => { + let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; + while let Some((k, v)) = sorter.next()? { + out_iter.append(k, v).with_context(|| { + format!("writing {:?} into LMDB", k.as_bstr()) + })?; + } + }, + WriteMethod::GetMergePut => { + while let Some((k, v)) = sorter.next()? { + let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; + match iter.next().transpose()? { + Some((key, old_val)) if key == k => { + let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; + let val = merge(k, &vals).expect("merge failed"); + iter.put_current(k, &val)?; + }, + _ => { + drop(iter); + database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?; + }, + } + } + }, + } + + Ok(()) +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] #[non_exhaustive] pub enum IndexDocumentsMethod { @@ -217,6 +259,8 @@ pub struct IndexDocuments<'t, 'u, 'i, 'a> { pub(crate) thread_pool: Option<&'a ThreadPool>, facet_level_group_size: Option, facet_min_level_size: Option, + words_prefix_threshold: Option, + max_prefix_length: Option, update_method: IndexDocumentsMethod, update_format: UpdateFormat, autogenerate_docids: bool, @@ -242,6 +286,8 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { thread_pool: None, facet_level_group_size: None, facet_min_level_size: None, + words_prefix_threshold: None, + max_prefix_length: None, update_method: IndexDocumentsMethod::ReplaceDocuments, update_format: UpdateFormat::Json, autogenerate_docids: true, @@ -625,6 +671,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { }); } + // Run the facets update operation. let mut builder = Facets::new(self.wtxn, self.index, self.update_id); builder.chunk_compression_type = self.chunk_compression_type; builder.chunk_compression_level = self.chunk_compression_level; @@ -637,6 +684,19 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { } builder.execute()?; + // Run the words prefixes update operation. + let mut builder = WordsPrefixes::new(self.wtxn, self.index, self.update_id); + builder.chunk_compression_type = self.chunk_compression_type; + builder.chunk_compression_level = self.chunk_compression_level; + builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; + if let Some(value) = self.words_prefix_threshold { + builder.threshold(value); + } + if let Some(value) = self.max_prefix_length { + builder.max_prefix_length(value); + } + builder.execute()?; + debug_assert_eq!(database_count, total_databases); info!("Transform output indexed in {:.02?}", before_indexing.elapsed()); diff --git a/milli/src/update/mod.rs b/milli/src/update/mod.rs index 2cd532c83..fcdcb33e9 100644 --- a/milli/src/update/mod.rs +++ b/milli/src/update/mod.rs @@ -6,12 +6,14 @@ mod index_documents; mod settings; mod update_builder; mod update_step; +mod words_prefixes; pub use self::available_documents_ids::AvailableDocumentsIds; pub use self::clear_documents::ClearDocuments; pub use self::delete_documents::DeleteDocuments; -pub use self::index_documents::{IndexDocuments, IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; pub use self::facets::Facets; +pub use self::index_documents::{IndexDocuments, IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; pub use self::settings::Settings; pub use self::update_builder::UpdateBuilder; pub use self::update_step::UpdateIndexingStep; +pub use self::words_prefixes::WordsPrefixes; diff --git a/milli/src/update/words_prefixes.rs b/milli/src/update/words_prefixes.rs new file mode 100644 index 000000000..f7c898c89 --- /dev/null +++ b/milli/src/update/words_prefixes.rs @@ -0,0 +1,194 @@ +use std::iter::FromIterator; +use std::str; + +use fst::automaton::Str; +use fst::{Automaton, Streamer, IntoStreamer}; +use grenad::CompressionType; +use heed::BytesEncode; +use heed::types::ByteSlice; + +use crate::heed_codec::StrStrU8Codec; +use crate::update::index_documents::WriteMethod; +use crate::update::index_documents::{create_sorter, sorter_into_lmdb_database}; +use crate::update::index_documents::{word_docids_merge, words_pairs_proximities_docids_merge}; +use crate::{Index, SmallString32}; + +pub struct WordsPrefixes<'t, 'u, 'i> { + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + pub(crate) chunk_compression_type: CompressionType, + pub(crate) chunk_compression_level: Option, + pub(crate) chunk_fusing_shrink_size: Option, + pub(crate) max_nb_chunks: Option, + pub(crate) max_memory: Option, + threshold: f64, + max_prefix_length: usize, + _update_id: u64, +} + +impl<'t, 'u, 'i> WordsPrefixes<'t, 'u, 'i> { + pub fn new( + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + update_id: u64, + ) -> WordsPrefixes<'t, 'u, 'i> + { + WordsPrefixes { + wtxn, + index, + chunk_compression_type: CompressionType::None, + chunk_compression_level: None, + chunk_fusing_shrink_size: None, + max_nb_chunks: None, + max_memory: None, + threshold: 0.01, // 1% + max_prefix_length: 4, + _update_id: update_id, + } + } + + /// Set the ratio of concerned words required to make a prefix be part of the words prefixes + /// database. If a word prefix is supposed to match more than this number of words in the + /// dictionnary, therefore this prefix is added to the words prefixes datastructures. + /// + /// Default value is `0.01` or `1%`. This value must be between 0 and 1 and will be clamped + /// to these bounds otherwise. + pub fn threshold(&mut self, value: f64) -> &mut Self { + self.threshold = value.min(1.0).max(0.0); // clamp [0, 1] + self + } + + /// Set the maximum length of prefixes in bytes. + /// + /// Default value is `4` bytes. This value must be between 1 and 25 will be clamped + /// to these bounds, otherwise. + pub fn max_prefix_length(&mut self, value: usize) -> &mut Self { + self.max_prefix_length = value.min(25).max(1); // clamp [1, 25] + self + } + + pub fn execute(self) -> anyhow::Result<()> { + // Clear the words prefixes datastructures. + self.index.word_prefix_docids.clear(self.wtxn)?; + self.index.word_prefix_pair_proximity_docids.clear(self.wtxn)?; + + let words_fst = self.index.words_fst(&self.wtxn)?; + let number_of_words = words_fst.len(); + let min_number_of_words = (number_of_words as f64 * self.threshold) as usize; + + // It is forbidden to keep a mutable reference into the database + // and write into it at the same time, therefore we write into another file. + let mut prefix_docids_sorter = create_sorter( + word_docids_merge, + self.chunk_compression_type, + self.chunk_compression_level, + self.chunk_fusing_shrink_size, + self.max_nb_chunks, + self.max_memory, + ); + + let mut prefix_fsts = Vec::with_capacity(self.max_prefix_length); + for n in 1..=self.max_prefix_length { + + let mut current_prefix = SmallString32::new(); + let mut current_prefix_count = 0; + let mut builder = fst::SetBuilder::memory(); + + let mut stream = words_fst.stream(); + while let Some(bytes) = stream.next() { + // We try to get the first n bytes out of this string but we only want + // to split at valid characters bounds. If we try to split in the middle of + // a character we ignore this word and go to the next one. + let word = str::from_utf8(bytes)?; + let prefix = match word.get(..n) { + Some(prefix) => prefix, + None => continue, + }; + + // This is the first iteration of the loop, + // or the current word doesn't starts with the current prefix. + if current_prefix_count == 0 || prefix != current_prefix.as_str() { + current_prefix = SmallString32::from(prefix); + current_prefix_count = 0; + } + + current_prefix_count += 1; + + // There is enough words corresponding to this prefix to add it to the cache. + if current_prefix_count == min_number_of_words { + builder.insert(prefix)?; + } + } + + // We construct the final set for prefixes of size n. + prefix_fsts.push(builder.into_set()); + } + + // We merge all of the previously computed prefixes into on final set. + let op = fst::set::OpBuilder::from_iter(prefix_fsts.iter()); + let mut builder = fst::SetBuilder::memory(); + builder.extend_stream(op.r#union())?; + let prefix_fst = builder.into_set(); + + // We iterate over all the prefixes and retrieve the corresponding docids. + let mut prefix_stream = prefix_fst.stream(); + while let Some(bytes) = prefix_stream.next() { + let prefix = str::from_utf8(bytes)?; + let db = self.index.word_docids.remap_data_type::(); + for result in db.prefix_iter(self.wtxn, prefix)? { + let (_word, data) = result?; + prefix_docids_sorter.insert(prefix, data)?; + } + } + + // Set the words prefixes FST in the dtabase. + self.index.put_words_prefixes_fst(self.wtxn, &prefix_fst)?; + + // We finally write the word prefix docids into the LMDB database. + sorter_into_lmdb_database( + self.wtxn, + *self.index.word_prefix_docids.as_polymorph(), + prefix_docids_sorter, + word_docids_merge, + WriteMethod::Append, + )?; + + // We compute the word prefix pair proximity database. + + // Here we create a sorter akin to the previous one. + let mut word_prefix_pair_proximity_docids_sorter = create_sorter( + words_pairs_proximities_docids_merge, + self.chunk_compression_type, + self.chunk_compression_level, + self.chunk_fusing_shrink_size, + self.max_nb_chunks, + self.max_memory, + ); + + // We insert all the word pairs corresponding to the word-prefix pairs + // where the prefixes appears in the prefix FST previously constructed. + let db = self.index.word_pair_proximity_docids.remap_data_type::(); + for result in db.iter(self.wtxn)? { + let ((word1, word2, prox), data) = result?; + let automaton = Str::new(word2).starts_with(); + let mut matching_prefixes = prefix_fst.search(automaton).into_stream(); + while let Some(prefix) = matching_prefixes.next() { + let prefix = str::from_utf8(prefix)?; + let pair = (word1, prefix, prox); + let bytes = StrStrU8Codec::bytes_encode(&pair).unwrap(); + word_prefix_pair_proximity_docids_sorter.insert(bytes, data)?; + } + } + + // We finally write the word prefix pair proximity docids into the LMDB database. + sorter_into_lmdb_database( + self.wtxn, + *self.index.word_prefix_pair_proximity_docids.as_polymorph(), + word_prefix_pair_proximity_docids_sorter, + words_pairs_proximities_docids_merge, + WriteMethod::Append, + )?; + + Ok(()) + } +}