From e65bad16ccd273625a624d44bde06e01eaf08bdb Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 25 Mar 2021 11:10:12 +0100 Subject: [PATCH] Compute the words prefixes at the end of an update --- http-ui/src/main.rs | 68 ------ infos/src/main.rs | 1 + milli/src/index.rs | 6 +- milli/src/update/clear_documents.rs | 2 + milli/src/update/delete_documents.rs | 16 ++ .../update/index_documents/merge_function.rs | 4 + milli/src/update/index_documents/mod.rs | 37 +++- milli/src/update/mod.rs | 9 +- milli/src/update/update_builder.rs | 35 +--- milli/src/update/word_prefix_docids.rs | 75 +++++++ .../word_prefix_pair_proximity_docids.rs | 89 ++++++++ milli/src/update/words_level_positions.rs | 90 ++++++-- milli/src/update/words_prefixes.rs | 196 ------------------ milli/src/update/words_prefixes_fst.rs | 104 ++++++++++ 14 files changed, 409 insertions(+), 323 deletions(-) create mode 100644 milli/src/update/word_prefix_docids.rs create mode 100644 milli/src/update/word_prefix_pair_proximity_docids.rs delete mode 100644 milli/src/update/words_prefixes.rs create mode 100644 milli/src/update/words_prefixes_fst.rs diff --git a/http-ui/src/main.rs b/http-ui/src/main.rs index c85bd9b15..00618f58a 100644 --- a/http-ui/src/main.rs +++ b/http-ui/src/main.rs @@ -228,8 +228,6 @@ enum UpdateMeta { ClearDocuments, Settings(Settings), Facets(Facets), - WordsPrefixes(WordsPrefixes), - WordsLevelPositions(WordsLevelPositions), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -290,14 +288,6 @@ struct WordsLevelPositions { min_level_size: Option, } -// Any value that is present is considered Some value, including null. -fn deserialize_some<'de, T, D>(deserializer: D) -> Result, D::Error> -where T: Deserialize<'de>, - D: Deserializer<'de> -{ - Deserialize::deserialize(deserializer).map(Some) -} - #[tokio::main] async fn main() -> anyhow::Result<()> { let opt = Opt::from_args(); @@ -496,36 +486,6 @@ async fn main() -> anyhow::Result<()> { Err(e) => Err(e) } } - UpdateMeta::WordsPrefixes(settings) => { - // We must use the write transaction of the update here. - let mut wtxn = index_cloned.write_txn()?; - let mut builder = update_builder.words_prefixes(&mut wtxn, &index_cloned); - if let Some(value) = settings.threshold { - builder.threshold(value); - } - if let Some(value) = settings.max_prefix_length { - builder.max_prefix_length(value); - } - match builder.execute() { - Ok(()) => wtxn.commit().map_err(Into::into), - Err(e) => Err(e) - } - }, - UpdateMeta::WordsLevelPositions(levels) => { - // We must use the write transaction of the update here. - let mut wtxn = index_cloned.write_txn()?; - let mut builder = update_builder.words_level_positions(&mut wtxn, &index_cloned); - if let Some(value) = levels.level_group_size { - builder.level_group_size(value); - } - if let Some(value) = levels.min_level_size { - builder.min_level_size(value); - } - match builder.execute() { - Ok(()) => wtxn.commit().map_err(Into::into), - Err(e) => Err(e.into()) - } - } }; let meta = match result { @@ -942,32 +902,6 @@ async fn main() -> anyhow::Result<()> { warp::reply() }); - let update_store_cloned = update_store.clone(); - let update_status_sender_cloned = update_status_sender.clone(); - let change_words_prefixes_route = warp::filters::method::post() - .and(warp::path!("words-prefixes")) - .and(warp::body::json()) - .map(move |settings: WordsPrefixes| { - let meta = UpdateMeta::WordsPrefixes(settings); - let update_id = update_store_cloned.register_update(&meta, &[]).unwrap(); - let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta }); - eprintln!("update {} registered", update_id); - warp::reply() - }); - - let update_store_cloned = update_store.clone(); - let update_status_sender_cloned = update_status_sender.clone(); - let change_words_level_positions_route = warp::filters::method::post() - .and(warp::path!("words-level-positions")) - .and(warp::body::json()) - .map(move |levels: WordsLevelPositions| { - let meta = UpdateMeta::WordsLevelPositions(levels); - let update_id = update_store_cloned.register_update(&meta, &[]).unwrap(); - let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta }); - eprintln!("update {} registered", update_id); - warp::reply() - }); - let update_store_cloned = update_store.clone(); let update_status_sender_cloned = update_status_sender.clone(); let abort_update_id_route = warp::filters::method::delete() @@ -1042,8 +976,6 @@ async fn main() -> anyhow::Result<()> { .or(clearing_route) .or(change_settings_route) .or(change_facet_levels_route) - .or(change_words_prefixes_route) - .or(change_words_level_positions_route) .or(update_ws_route); let addr = SocketAddr::from_str(&opt.http_listen_addr)?; diff --git a/infos/src/main.rs b/infos/src/main.rs index 0e6403d7b..e730a8b43 100644 --- a/infos/src/main.rs +++ b/infos/src/main.rs @@ -338,6 +338,7 @@ fn biggest_value_sizes(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyho facet_field_id_value_docids, field_id_docid_facet_values: _, documents, + .. } = index; let main_name = "main"; diff --git a/milli/src/index.rs b/milli/src/index.rs index 0659b207a..ba7747250 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -54,6 +54,8 @@ pub struct Index { pub word_prefix_pair_proximity_docids: Database, /// Maps the word, level and position range with the docids that corresponds to it. pub word_level_position_docids: Database, + /// Maps the level positions of a word prefix with all the docids where this prefix appears. + pub word_prefix_level_position_docids: Database, /// Maps the facet field id and the globally ordered value with the docids that corresponds to it. pub facet_field_id_value_docids: Database, /// Maps the document id, the facet field id and the globally ordered value. @@ -64,7 +66,7 @@ pub struct Index { impl Index { pub fn new>(mut options: heed::EnvOpenOptions, path: P) -> anyhow::Result { - options.max_dbs(10); + options.max_dbs(11); let env = options.open(path)?; let main = env.create_poly_database(Some("main"))?; @@ -74,6 +76,7 @@ impl Index { let word_pair_proximity_docids = env.create_database(Some("word-pair-proximity-docids"))?; let word_prefix_pair_proximity_docids = env.create_database(Some("word-prefix-pair-proximity-docids"))?; let word_level_position_docids = env.create_database(Some("word-level-position-docids"))?; + let word_prefix_level_position_docids = env.create_database(Some("word-prefix-level-position-docids"))?; let facet_field_id_value_docids = env.create_database(Some("facet-field-id-value-docids"))?; let field_id_docid_facet_values = env.create_database(Some("field-id-docid-facet-values"))?; let documents = env.create_database(Some("documents"))?; @@ -98,6 +101,7 @@ impl Index { word_pair_proximity_docids, word_prefix_pair_proximity_docids, word_level_position_docids, + word_prefix_level_position_docids, facet_field_id_value_docids, field_id_docid_facet_values, documents, diff --git a/milli/src/update/clear_documents.rs b/milli/src/update/clear_documents.rs index 6d7dd72b8..f89c2d00c 100644 --- a/milli/src/update/clear_documents.rs +++ b/milli/src/update/clear_documents.rs @@ -29,6 +29,7 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> { word_pair_proximity_docids, word_prefix_pair_proximity_docids, word_level_position_docids, + word_prefix_level_position_docids, facet_field_id_value_docids, field_id_docid_facet_values, documents, @@ -57,6 +58,7 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> { word_pair_proximity_docids.clear(self.wtxn)?; word_prefix_pair_proximity_docids.clear(self.wtxn)?; word_level_position_docids.clear(self.wtxn)?; + word_prefix_level_position_docids.clear(self.wtxn)?; facet_field_id_value_docids.clear(self.wtxn)?; field_id_docid_facet_values.clear(self.wtxn)?; documents.clear(self.wtxn)?; diff --git a/milli/src/update/delete_documents.rs b/milli/src/update/delete_documents.rs index f9303d339..4c5f8d61a 100644 --- a/milli/src/update/delete_documents.rs +++ b/milli/src/update/delete_documents.rs @@ -89,6 +89,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { word_pair_proximity_docids, word_prefix_pair_proximity_docids, word_level_position_docids, + word_prefix_level_position_docids, facet_field_id_value_docids, field_id_docid_facet_values, documents, @@ -345,6 +346,21 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { drop(iter); + // We delete the documents ids that are under the word prefix level position docids. + let mut iter = word_prefix_level_position_docids.iter_mut(self.wtxn)?.remap_key_type::(); + while let Some(result) = iter.next() { + let (bytes, mut docids) = result?; + let previous_len = docids.len(); + docids.difference_with(&self.documents_ids); + if docids.is_empty() { + iter.del_current()?; + } else if docids.len() != previous_len { + iter.put_current(bytes, &docids)?; + } + } + + drop(iter); + Ok(self.documents_ids.len()) } } diff --git a/milli/src/update/index_documents/merge_function.rs b/milli/src/update/index_documents/merge_function.rs index 54f994fc0..a6d008513 100644 --- a/milli/src/update/index_documents/merge_function.rs +++ b/milli/src/update/index_documents/merge_function.rs @@ -52,6 +52,10 @@ pub fn words_pairs_proximities_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) - cbo_roaring_bitmap_merge(values) } +pub fn word_prefix_level_positions_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { + cbo_roaring_bitmap_merge(values) +} + pub fn word_level_position_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { cbo_roaring_bitmap_merge(values) } diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 7a2196481..8ebdf1634 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -3,6 +3,7 @@ use std::collections::HashSet; use std::fs::File; use std::io::{self, Seek, SeekFrom}; use std::num::{NonZeroU32, NonZeroUsize}; +use std::str; use std::sync::mpsc::sync_channel; use std::time::Instant; @@ -13,18 +14,21 @@ use grenad::{MergerIter, Writer, Sorter, Merger, Reader, FileFuse, CompressionTy use heed::types::ByteSlice; use log::{debug, info, error}; use memmap::Mmap; -use rayon::ThreadPool; use rayon::prelude::*; +use rayon::ThreadPool; use serde::{Serialize, Deserialize}; use crate::index::Index; -use crate::update::{Facets, WordsLevelPositions, WordsPrefixes, UpdateIndexingStep}; +use crate::update::{ + Facets, WordsLevelPositions, WordPrefixDocids, WordsPrefixesFst, UpdateIndexingStep, + WordPrefixPairProximityDocids, +}; use self::store::{Store, Readers}; pub use self::merge_function::{ main_merge, word_docids_merge, words_pairs_proximities_docids_merge, docid_word_positions_merge, documents_merge, - word_level_position_docids_merge, facet_field_value_docids_merge, - field_id_docid_facet_values_merge, + word_level_position_docids_merge, word_prefix_level_positions_docids_merge, + facet_field_value_docids_merge, field_id_docid_facet_values_merge, }; pub use self::transform::{Transform, TransformOutput}; @@ -719,10 +723,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { builder.execute()?; // Run the words prefixes update operation. - let mut builder = WordsPrefixes::new(self.wtxn, self.index, self.update_id); - builder.chunk_compression_type = self.chunk_compression_type; - builder.chunk_compression_level = self.chunk_compression_level; - builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; + let mut builder = WordsPrefixesFst::new(self.wtxn, self.index, self.update_id); if let Some(value) = self.words_prefix_threshold { builder.threshold(value); } @@ -731,8 +732,26 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { } builder.execute()?; + // Run the word prefix docids update operation. + let mut builder = WordPrefixDocids::new(self.wtxn, self.index); + builder.chunk_compression_type = self.chunk_compression_type; + builder.chunk_compression_level = self.chunk_compression_level; + builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; + builder.max_nb_chunks = self.max_nb_chunks; + builder.max_memory = self.max_memory; + builder.execute()?; + + // Run the word prefix pair proximity docids update operation. + let mut builder = WordPrefixPairProximityDocids::new(self.wtxn, self.index); + builder.chunk_compression_type = self.chunk_compression_type; + builder.chunk_compression_level = self.chunk_compression_level; + builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; + builder.max_nb_chunks = self.max_nb_chunks; + builder.max_memory = self.max_memory; + builder.execute()?; + // Run the words level positions update operation. - let mut builder = WordsLevelPositions::new(self.wtxn, self.index, self.update_id); + let mut builder = WordsLevelPositions::new(self.wtxn, self.index); builder.chunk_compression_type = self.chunk_compression_type; builder.chunk_compression_level = self.chunk_compression_level; builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; diff --git a/milli/src/update/mod.rs b/milli/src/update/mod.rs index 1fc4890fb..203937e2f 100644 --- a/milli/src/update/mod.rs +++ b/milli/src/update/mod.rs @@ -6,8 +6,10 @@ pub use self::index_documents::{DocumentAdditionResult, IndexDocuments, IndexDoc pub use self::settings::{Setting, Settings}; pub use self::update_builder::UpdateBuilder; pub use self::update_step::UpdateIndexingStep; +pub use self::word_prefix_docids::WordPrefixDocids; +pub use self::word_prefix_pair_proximity_docids::WordPrefixPairProximityDocids; pub use self::words_level_positions::WordsLevelPositions; -pub use self::words_prefixes::WordsPrefixes; +pub use self::words_prefixes_fst::WordsPrefixesFst; mod available_documents_ids; mod clear_documents; @@ -17,6 +19,7 @@ mod index_documents; mod settings; mod update_builder; mod update_step; +mod word_prefix_docids; +mod word_prefix_pair_proximity_docids; mod words_level_positions; -mod words_prefixes; - +mod words_prefixes_fst; diff --git a/milli/src/update/update_builder.rs b/milli/src/update/update_builder.rs index 9a4fb850e..8d6eb034d 100644 --- a/milli/src/update/update_builder.rs +++ b/milli/src/update/update_builder.rs @@ -2,10 +2,7 @@ use grenad::CompressionType; use rayon::ThreadPool; use crate::Index; -use super::{ - ClearDocuments, DeleteDocuments, IndexDocuments, Settings, - Facets, WordsPrefixes, WordsLevelPositions, -}; +use super::{ClearDocuments, DeleteDocuments, IndexDocuments, Settings, Facets}; pub struct UpdateBuilder<'a> { pub(crate) log_every_n: Option, @@ -138,34 +135,4 @@ impl<'a> UpdateBuilder<'a> { builder } - - pub fn words_prefixes<'t, 'u, 'i>( - self, - wtxn: &'t mut heed::RwTxn<'i, 'u>, - index: &'i Index, - ) -> WordsPrefixes<'t, 'u, 'i> - { - let mut builder = WordsPrefixes::new(wtxn, index, self.update_id); - - builder.chunk_compression_type = self.chunk_compression_type; - builder.chunk_compression_level = self.chunk_compression_level; - builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; - - builder - } - - pub fn words_level_positions<'t, 'u, 'i>( - self, - wtxn: &'t mut heed::RwTxn<'i, 'u>, - index: &'i Index, - ) -> WordsLevelPositions<'t, 'u, 'i> - { - let mut builder = WordsLevelPositions::new(wtxn, index, self.update_id); - - builder.chunk_compression_type = self.chunk_compression_type; - builder.chunk_compression_level = self.chunk_compression_level; - builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; - - builder - } } diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs new file mode 100644 index 000000000..58c984212 --- /dev/null +++ b/milli/src/update/word_prefix_docids.rs @@ -0,0 +1,75 @@ +use std::str; + +use crate::Index; +use fst::Streamer; +use grenad::CompressionType; +use heed::types::ByteSlice; + +use crate::update::index_documents::WriteMethod; +use crate::update::index_documents::{create_sorter, word_docids_merge, sorter_into_lmdb_database}; + +pub struct WordPrefixDocids<'t, 'u, 'i> { + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + pub(crate) chunk_compression_type: CompressionType, + pub(crate) chunk_compression_level: Option, + pub(crate) chunk_fusing_shrink_size: Option, + pub(crate) max_nb_chunks: Option, + pub(crate) max_memory: Option, +} + +impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { + pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> WordPrefixDocids<'t, 'u, 'i> { + WordPrefixDocids { + wtxn, + index, + chunk_compression_type: CompressionType::None, + chunk_compression_level: None, + chunk_fusing_shrink_size: None, + max_nb_chunks: None, + max_memory: None, + } + } + + pub fn execute(self) -> anyhow::Result<()> { + // Clear the word prefix docids database. + self.index.word_prefix_docids.clear(self.wtxn)?; + + let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; + + // It is forbidden to keep a mutable reference into the database + // and write into it at the same time, therefore we write into another file. + let mut prefix_docids_sorter = create_sorter( + word_docids_merge, + self.chunk_compression_type, + self.chunk_compression_level, + self.chunk_fusing_shrink_size, + self.max_nb_chunks, + self.max_memory, + ); + + // We iterate over all the prefixes and retrieve the corresponding docids. + let mut prefix_stream = prefix_fst.stream(); + while let Some(bytes) = prefix_stream.next() { + let prefix = str::from_utf8(bytes)?; + let db = self.index.word_docids.remap_data_type::(); + for result in db.prefix_iter(self.wtxn, prefix)? { + let (_word, data) = result?; + prefix_docids_sorter.insert(prefix, data)?; + } + } + + drop(prefix_fst); + + // We finally write the word prefix docids into the LMDB database. + sorter_into_lmdb_database( + self.wtxn, + *self.index.word_prefix_docids.as_polymorph(), + prefix_docids_sorter, + word_docids_merge, + WriteMethod::Append, + )?; + + Ok(()) + } +} diff --git a/milli/src/update/word_prefix_pair_proximity_docids.rs b/milli/src/update/word_prefix_pair_proximity_docids.rs new file mode 100644 index 000000000..c972efc4f --- /dev/null +++ b/milli/src/update/word_prefix_pair_proximity_docids.rs @@ -0,0 +1,89 @@ +use std::str; + +use fst::automaton::{Automaton, Str}; +use fst::{Streamer, IntoStreamer}; +use grenad::CompressionType; +use heed::BytesEncode; +use heed::types::ByteSlice; +use log::debug; + +use crate::Index; +use crate::heed_codec::StrStrU8Codec; +use crate::update::index_documents::{ + WriteMethod, create_sorter, sorter_into_lmdb_database, + words_pairs_proximities_docids_merge, +}; + +pub struct WordPrefixPairProximityDocids<'t, 'u, 'i> { + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + pub(crate) chunk_compression_type: CompressionType, + pub(crate) chunk_compression_level: Option, + pub(crate) chunk_fusing_shrink_size: Option, + pub(crate) max_nb_chunks: Option, + pub(crate) max_memory: Option, +} + +impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { + pub fn new( + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + ) -> WordPrefixPairProximityDocids<'t, 'u, 'i> + { + WordPrefixPairProximityDocids { + wtxn, + index, + chunk_compression_type: CompressionType::None, + chunk_compression_level: None, + chunk_fusing_shrink_size: None, + max_nb_chunks: None, + max_memory: None, + } + } + + pub fn execute(self) -> anyhow::Result<()> { + debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk..."); + + self.index.word_prefix_pair_proximity_docids.clear(self.wtxn)?; + + let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; + + // Here we create a sorter akin to the previous one. + let mut word_prefix_pair_proximity_docids_sorter = create_sorter( + words_pairs_proximities_docids_merge, + self.chunk_compression_type, + self.chunk_compression_level, + self.chunk_fusing_shrink_size, + self.max_nb_chunks, + self.max_memory, + ); + + // We insert all the word pairs corresponding to the word-prefix pairs + // where the prefixes appears in the prefix FST previously constructed. + let db = self.index.word_pair_proximity_docids.remap_data_type::(); + for result in db.iter(self.wtxn)? { + let ((word1, word2, prox), data) = result?; + let automaton = Str::new(word2).starts_with(); + let mut matching_prefixes = prefix_fst.search(automaton).into_stream(); + while let Some(prefix) = matching_prefixes.next() { + let prefix = str::from_utf8(prefix)?; + let pair = (word1, prefix, prox); + let bytes = StrStrU8Codec::bytes_encode(&pair).unwrap(); + word_prefix_pair_proximity_docids_sorter.insert(bytes, data)?; + } + } + + drop(prefix_fst); + + // We finally write the word prefix pair proximity docids into the LMDB database. + sorter_into_lmdb_database( + self.wtxn, + *self.index.word_prefix_pair_proximity_docids.as_polymorph(), + word_prefix_pair_proximity_docids_sorter, + words_pairs_proximities_docids_merge, + WriteMethod::Append, + )?; + + Ok(()) + } +} diff --git a/milli/src/update/words_level_positions.rs b/milli/src/update/words_level_positions.rs index 70bc89860..1b772c37d 100644 --- a/milli/src/update/words_level_positions.rs +++ b/milli/src/update/words_level_positions.rs @@ -1,17 +1,22 @@ -use std::cmp; +use std::{cmp, str}; use std::convert::TryFrom; use std::fs::File; use std::num::NonZeroU32; +use fst::automaton::{self, Automaton}; +use fst::{Streamer, IntoStreamer}; use grenad::{CompressionType, Reader, Writer, FileFuse}; -use heed::types::{DecodeIgnore, Str}; +use heed::types::{ByteSlice, DecodeIgnore, Str}; use heed::{BytesEncode, Error}; use log::debug; use roaring::RoaringBitmap; use crate::heed_codec::{StrLevelPositionCodec, CboRoaringBitmapCodec}; use crate::update::index_documents::WriteMethod; -use crate::update::index_documents::{create_writer, writer_into_reader, write_into_lmdb_database}; +use crate::update::index_documents::{ + create_writer, create_sorter, writer_into_reader, write_into_lmdb_database, + word_prefix_level_positions_docids_merge, sorter_into_lmdb_database +}; use crate::{Index, TreeLevel}; pub struct WordsLevelPositions<'t, 'u, 'i> { @@ -20,27 +25,24 @@ pub struct WordsLevelPositions<'t, 'u, 'i> { pub(crate) chunk_compression_type: CompressionType, pub(crate) chunk_compression_level: Option, pub(crate) chunk_fusing_shrink_size: Option, + pub(crate) max_nb_chunks: Option, + pub(crate) max_memory: Option, level_group_size: NonZeroU32, min_level_size: NonZeroU32, - _update_id: u64, } impl<'t, 'u, 'i> WordsLevelPositions<'t, 'u, 'i> { - pub fn new( - wtxn: &'t mut heed::RwTxn<'i, 'u>, - index: &'i Index, - update_id: u64, - ) -> WordsLevelPositions<'t, 'u, 'i> - { + pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> WordsLevelPositions<'t, 'u, 'i> { WordsLevelPositions { wtxn, index, chunk_compression_type: CompressionType::None, chunk_compression_level: None, chunk_fusing_shrink_size: None, + max_nb_chunks: None, + max_memory: None, level_group_size: NonZeroU32::new(4).unwrap(), min_level_size: NonZeroU32::new(5).unwrap(), - _update_id: update_id, } } @@ -76,7 +78,71 @@ impl<'t, 'u, 'i> WordsLevelPositions<'t, 'u, 'i> { self.wtxn, *self.index.word_level_position_docids.as_polymorph(), entries, - |_, _| anyhow::bail!("invalid facet level merging"), + |_, _| anyhow::bail!("invalid word level position merging"), + WriteMethod::Append, + )?; + + // We compute the word prefix level positions database. + self.index.word_prefix_level_position_docids.clear(self.wtxn)?; + + let mut word_prefix_level_positions_docids_sorter = create_sorter( + word_prefix_level_positions_docids_merge, + self.chunk_compression_type, + self.chunk_compression_level, + self.chunk_fusing_shrink_size, + self.max_nb_chunks, + self.max_memory, + ); + + // We insert the word prefix level positions where the level is equal to 0 and + // corresponds to the word-prefix level positions where the prefixes appears + // in the prefix FST previously constructed. + let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; + let db = self.index.word_level_position_docids.remap_data_type::(); + for result in db.iter(self.wtxn)? { + let ((word, level, left, right), data) = result?; + if level == TreeLevel::min_value() { + let automaton = automaton::Str::new(word).starts_with(); + let mut matching_prefixes = prefix_fst.search(automaton).into_stream(); + while let Some(prefix) = matching_prefixes.next() { + let prefix = str::from_utf8(prefix)?; + let key = (prefix, level, left, right); + let bytes = StrLevelPositionCodec::bytes_encode(&key).unwrap(); + word_prefix_level_positions_docids_sorter.insert(bytes, data)?; + } + } + } + + // We finally write all the word prefix level positions docids with + // a level equal to 0 into the LMDB database. + sorter_into_lmdb_database( + self.wtxn, + *self.index.word_prefix_level_position_docids.as_polymorph(), + word_prefix_level_positions_docids_sorter, + word_prefix_level_positions_docids_merge, + WriteMethod::Append, + )?; + + let entries = compute_positions_levels( + self.wtxn, + self.index.word_prefix_docids.remap_data_type::(), + self.index.word_prefix_level_position_docids, + self.chunk_compression_type, + self.chunk_compression_level, + self.chunk_fusing_shrink_size, + self.level_group_size, + self.min_level_size, + )?; + + // The previously computed entries also defines the level 0 entries + // so we can clear the database and append all of these entries. + self.index.word_prefix_level_position_docids.clear(self.wtxn)?; + + write_into_lmdb_database( + self.wtxn, + *self.index.word_prefix_level_position_docids.as_polymorph(), + entries, + |_, _| anyhow::bail!("invalid word prefix level position merging"), WriteMethod::Append, )?; diff --git a/milli/src/update/words_prefixes.rs b/milli/src/update/words_prefixes.rs deleted file mode 100644 index f2fe526a2..000000000 --- a/milli/src/update/words_prefixes.rs +++ /dev/null @@ -1,196 +0,0 @@ -use std::iter::FromIterator; -use std::str; - -use chrono::Utc; -use fst::automaton::Str; -use fst::{Automaton, Streamer, IntoStreamer}; -use grenad::CompressionType; -use heed::BytesEncode; -use heed::types::ByteSlice; - -use crate::heed_codec::StrStrU8Codec; -use crate::update::index_documents::WriteMethod; -use crate::update::index_documents::{create_sorter, sorter_into_lmdb_database}; -use crate::update::index_documents::{word_docids_merge, words_pairs_proximities_docids_merge}; -use crate::{Index, SmallString32}; - -pub struct WordsPrefixes<'t, 'u, 'i> { - wtxn: &'t mut heed::RwTxn<'i, 'u>, - index: &'i Index, - pub(crate) chunk_compression_type: CompressionType, - pub(crate) chunk_compression_level: Option, - pub(crate) chunk_fusing_shrink_size: Option, - pub(crate) max_nb_chunks: Option, - pub(crate) max_memory: Option, - threshold: f64, - max_prefix_length: usize, - _update_id: u64, -} - -impl<'t, 'u, 'i> WordsPrefixes<'t, 'u, 'i> { - pub fn new( - wtxn: &'t mut heed::RwTxn<'i, 'u>, - index: &'i Index, - update_id: u64, - ) -> WordsPrefixes<'t, 'u, 'i> - { - WordsPrefixes { - wtxn, - index, - chunk_compression_type: CompressionType::None, - chunk_compression_level: None, - chunk_fusing_shrink_size: None, - max_nb_chunks: None, - max_memory: None, - threshold: 0.1 / 100.0, // .01% - max_prefix_length: 4, - _update_id: update_id, - } - } - - /// Set the ratio of concerned words required to make a prefix be part of the words prefixes - /// database. If a word prefix is supposed to match more than this number of words in the - /// dictionnary, therefore this prefix is added to the words prefixes datastructures. - /// - /// Default value is `0.01` or `1%`. This value must be between 0 and 1 and will be clamped - /// to these bounds otherwise. - pub fn threshold(&mut self, value: f64) -> &mut Self { - self.threshold = value.min(1.0).max(0.0); // clamp [0, 1] - self - } - - /// Set the maximum length of prefixes in bytes. - /// - /// Default value is `4` bytes. This value must be between 1 and 25 will be clamped - /// to these bounds, otherwise. - pub fn max_prefix_length(&mut self, value: usize) -> &mut Self { - self.max_prefix_length = value.min(25).max(1); // clamp [1, 25] - self - } - - pub fn execute(self) -> anyhow::Result<()> { - self.index.set_updated_at(self.wtxn, &Utc::now())?; - // Clear the words prefixes datastructures. - self.index.word_prefix_docids.clear(self.wtxn)?; - self.index.word_prefix_pair_proximity_docids.clear(self.wtxn)?; - - let words_fst = self.index.words_fst(&self.wtxn)?; - let number_of_words = words_fst.len(); - let min_number_of_words = (number_of_words as f64 * self.threshold) as usize; - - // It is forbidden to keep a mutable reference into the database - // and write into it at the same time, therefore we write into another file. - let mut prefix_docids_sorter = create_sorter( - word_docids_merge, - self.chunk_compression_type, - self.chunk_compression_level, - self.chunk_fusing_shrink_size, - self.max_nb_chunks, - self.max_memory, - ); - - let mut prefix_fsts = Vec::with_capacity(self.max_prefix_length); - for n in 1..=self.max_prefix_length { - - let mut current_prefix = SmallString32::new(); - let mut current_prefix_count = 0; - let mut builder = fst::SetBuilder::memory(); - - let mut stream = words_fst.stream(); - while let Some(bytes) = stream.next() { - // We try to get the first n bytes out of this string but we only want - // to split at valid characters bounds. If we try to split in the middle of - // a character we ignore this word and go to the next one. - let word = str::from_utf8(bytes)?; - let prefix = match word.get(..n) { - Some(prefix) => prefix, - None => continue, - }; - - // This is the first iteration of the loop, - // or the current word doesn't starts with the current prefix. - if current_prefix_count == 0 || prefix != current_prefix.as_str() { - current_prefix = SmallString32::from(prefix); - current_prefix_count = 0; - } - - current_prefix_count += 1; - - // There is enough words corresponding to this prefix to add it to the cache. - if current_prefix_count == min_number_of_words { - builder.insert(prefix)?; - } - } - - // We construct the final set for prefixes of size n. - prefix_fsts.push(builder.into_set()); - } - - // We merge all of the previously computed prefixes into on final set. - let op = fst::set::OpBuilder::from_iter(prefix_fsts.iter()); - let mut builder = fst::SetBuilder::memory(); - builder.extend_stream(op.r#union())?; - let prefix_fst = builder.into_set(); - - // We iterate over all the prefixes and retrieve the corresponding docids. - let mut prefix_stream = prefix_fst.stream(); - while let Some(bytes) = prefix_stream.next() { - let prefix = str::from_utf8(bytes)?; - let db = self.index.word_docids.remap_data_type::(); - for result in db.prefix_iter(self.wtxn, prefix)? { - let (_word, data) = result?; - prefix_docids_sorter.insert(prefix, data)?; - } - } - - // Set the words prefixes FST in the dtabase. - self.index.put_words_prefixes_fst(self.wtxn, &prefix_fst)?; - - // We finally write the word prefix docids into the LMDB database. - sorter_into_lmdb_database( - self.wtxn, - *self.index.word_prefix_docids.as_polymorph(), - prefix_docids_sorter, - word_docids_merge, - WriteMethod::Append, - )?; - - // We compute the word prefix pair proximity database. - - // Here we create a sorter akin to the previous one. - let mut word_prefix_pair_proximity_docids_sorter = create_sorter( - words_pairs_proximities_docids_merge, - self.chunk_compression_type, - self.chunk_compression_level, - self.chunk_fusing_shrink_size, - self.max_nb_chunks, - self.max_memory, - ); - - // We insert all the word pairs corresponding to the word-prefix pairs - // where the prefixes appears in the prefix FST previously constructed. - let db = self.index.word_pair_proximity_docids.remap_data_type::(); - for result in db.iter(self.wtxn)? { - let ((word1, word2, prox), data) = result?; - let automaton = Str::new(word2).starts_with(); - let mut matching_prefixes = prefix_fst.search(automaton).into_stream(); - while let Some(prefix) = matching_prefixes.next() { - let prefix = str::from_utf8(prefix)?; - let pair = (word1, prefix, prox); - let bytes = StrStrU8Codec::bytes_encode(&pair).unwrap(); - word_prefix_pair_proximity_docids_sorter.insert(bytes, data)?; - } - } - - // We finally write the word prefix pair proximity docids into the LMDB database. - sorter_into_lmdb_database( - self.wtxn, - *self.index.word_prefix_pair_proximity_docids.as_polymorph(), - word_prefix_pair_proximity_docids_sorter, - words_pairs_proximities_docids_merge, - WriteMethod::Append, - )?; - - Ok(()) - } -} diff --git a/milli/src/update/words_prefixes_fst.rs b/milli/src/update/words_prefixes_fst.rs new file mode 100644 index 000000000..f53b0ee00 --- /dev/null +++ b/milli/src/update/words_prefixes_fst.rs @@ -0,0 +1,104 @@ +use std::iter::FromIterator; +use std::str; + +use fst::Streamer; +use crate::{Index, SmallString32}; + +pub struct WordsPrefixesFst<'t, 'u, 'i> { + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + threshold: f64, + max_prefix_length: usize, + _update_id: u64, +} + +impl<'t, 'u, 'i> WordsPrefixesFst<'t, 'u, 'i> { + pub fn new( + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + update_id: u64, + ) -> WordsPrefixesFst<'t, 'u, 'i> + { + WordsPrefixesFst { + wtxn, + index, + threshold: 0.1 / 100.0, // .01% + max_prefix_length: 4, + _update_id: update_id, + } + } + + /// Set the ratio of concerned words required to make a prefix be part of the words prefixes + /// database. If a word prefix is supposed to match more than this number of words in the + /// dictionnary, therefore this prefix is added to the words prefixes datastructures. + /// + /// Default value is `0.01` or `1%`. This value must be between 0 and 1 and will be clamped + /// to these bounds otherwise. + pub fn threshold(&mut self, value: f64) -> &mut Self { + self.threshold = value.min(1.0).max(0.0); // clamp [0, 1] + self + } + + /// Set the maximum length of prefixes in bytes. + /// + /// Default value is `4` bytes. This value must be between 1 and 25 will be clamped + /// to these bounds, otherwise. + pub fn max_prefix_length(&mut self, value: usize) -> &mut Self { + self.max_prefix_length = value.min(25).max(1); // clamp [1, 25] + self + } + + pub fn execute(self) -> anyhow::Result<()> { + let words_fst = self.index.words_fst(&self.wtxn)?; + let number_of_words = words_fst.len(); + let min_number_of_words = (number_of_words as f64 * self.threshold) as usize; + + let mut prefix_fsts = Vec::with_capacity(self.max_prefix_length); + for n in 1..=self.max_prefix_length { + + let mut current_prefix = SmallString32::new(); + let mut current_prefix_count = 0; + let mut builder = fst::SetBuilder::memory(); + + let mut stream = words_fst.stream(); + while let Some(bytes) = stream.next() { + // We try to get the first n bytes out of this string but we only want + // to split at valid characters bounds. If we try to split in the middle of + // a character we ignore this word and go to the next one. + let word = str::from_utf8(bytes)?; + let prefix = match word.get(..n) { + Some(prefix) => prefix, + None => continue, + }; + + // This is the first iteration of the loop, + // or the current word doesn't starts with the current prefix. + if current_prefix_count == 0 || prefix != current_prefix.as_str() { + current_prefix = SmallString32::from(prefix); + current_prefix_count = 0; + } + + current_prefix_count += 1; + + // There is enough words corresponding to this prefix to add it to the cache. + if current_prefix_count == min_number_of_words { + builder.insert(prefix)?; + } + } + + // We construct the final set for prefixes of size n. + prefix_fsts.push(builder.into_set()); + } + + // We merge all of the previously computed prefixes into on final set. + let op = fst::set::OpBuilder::from_iter(prefix_fsts.iter()); + let mut builder = fst::SetBuilder::memory(); + builder.extend_stream(op.r#union())?; + let prefix_fst = builder.into_set(); + + // Set the words prefixes FST in the dtabase. + self.index.put_words_prefixes_fst(self.wtxn, &prefix_fst)?; + + Ok(()) + } +}