Compute the words prefixes at the end of an update

This commit is contained in:
Kerollmops 2021-03-25 11:10:12 +01:00 committed by many
parent ab92c814c3
commit e65bad16cc
No known key found for this signature in database
GPG Key ID: 2CEF23B75189EACA
14 changed files with 409 additions and 323 deletions

View File

@ -228,8 +228,6 @@ enum UpdateMeta {
ClearDocuments,
Settings(Settings),
Facets(Facets),
WordsPrefixes(WordsPrefixes),
WordsLevelPositions(WordsLevelPositions),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -290,14 +288,6 @@ struct WordsLevelPositions {
min_level_size: Option<NonZeroU32>,
}
// Any value that is present is considered Some value, including null.
fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
where T: Deserialize<'de>,
D: Deserializer<'de>
{
Deserialize::deserialize(deserializer).map(Some)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let opt = Opt::from_args();
@ -496,36 +486,6 @@ async fn main() -> anyhow::Result<()> {
Err(e) => Err(e)
}
}
UpdateMeta::WordsPrefixes(settings) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.words_prefixes(&mut wtxn, &index_cloned);
if let Some(value) = settings.threshold {
builder.threshold(value);
}
if let Some(value) = settings.max_prefix_length {
builder.max_prefix_length(value);
}
match builder.execute() {
Ok(()) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e)
}
},
UpdateMeta::WordsLevelPositions(levels) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.words_level_positions(&mut wtxn, &index_cloned);
if let Some(value) = levels.level_group_size {
builder.level_group_size(value);
}
if let Some(value) = levels.min_level_size {
builder.min_level_size(value);
}
match builder.execute() {
Ok(()) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into())
}
}
};
let meta = match result {
@ -942,32 +902,6 @@ async fn main() -> anyhow::Result<()> {
warp::reply()
});
let update_store_cloned = update_store.clone();
let update_status_sender_cloned = update_status_sender.clone();
let change_words_prefixes_route = warp::filters::method::post()
.and(warp::path!("words-prefixes"))
.and(warp::body::json())
.map(move |settings: WordsPrefixes| {
let meta = UpdateMeta::WordsPrefixes(settings);
let update_id = update_store_cloned.register_update(&meta, &[]).unwrap();
let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta });
eprintln!("update {} registered", update_id);
warp::reply()
});
let update_store_cloned = update_store.clone();
let update_status_sender_cloned = update_status_sender.clone();
let change_words_level_positions_route = warp::filters::method::post()
.and(warp::path!("words-level-positions"))
.and(warp::body::json())
.map(move |levels: WordsLevelPositions| {
let meta = UpdateMeta::WordsLevelPositions(levels);
let update_id = update_store_cloned.register_update(&meta, &[]).unwrap();
let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta });
eprintln!("update {} registered", update_id);
warp::reply()
});
let update_store_cloned = update_store.clone();
let update_status_sender_cloned = update_status_sender.clone();
let abort_update_id_route = warp::filters::method::delete()
@ -1042,8 +976,6 @@ async fn main() -> anyhow::Result<()> {
.or(clearing_route)
.or(change_settings_route)
.or(change_facet_levels_route)
.or(change_words_prefixes_route)
.or(change_words_level_positions_route)
.or(update_ws_route);
let addr = SocketAddr::from_str(&opt.http_listen_addr)?;

View File

@ -338,6 +338,7 @@ fn biggest_value_sizes(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyho
facet_field_id_value_docids,
field_id_docid_facet_values: _,
documents,
..
} = index;
let main_name = "main";

View File

@ -54,6 +54,8 @@ pub struct Index {
pub word_prefix_pair_proximity_docids: Database<StrStrU8Codec, CboRoaringBitmapCodec>,
/// Maps the word, level and position range with the docids that corresponds to it.
pub word_level_position_docids: Database<StrLevelPositionCodec, CboRoaringBitmapCodec>,
/// Maps the level positions of a word prefix with all the docids where this prefix appears.
pub word_prefix_level_position_docids: Database<StrLevelPositionCodec, CboRoaringBitmapCodec>,
/// Maps the facet field id and the globally ordered value with the docids that corresponds to it.
pub facet_field_id_value_docids: Database<ByteSlice, CboRoaringBitmapCodec>,
/// Maps the document id, the facet field id and the globally ordered value.
@ -64,7 +66,7 @@ pub struct Index {
impl Index {
pub fn new<P: AsRef<Path>>(mut options: heed::EnvOpenOptions, path: P) -> anyhow::Result<Index> {
options.max_dbs(10);
options.max_dbs(11);
let env = options.open(path)?;
let main = env.create_poly_database(Some("main"))?;
@ -74,6 +76,7 @@ impl Index {
let word_pair_proximity_docids = env.create_database(Some("word-pair-proximity-docids"))?;
let word_prefix_pair_proximity_docids = env.create_database(Some("word-prefix-pair-proximity-docids"))?;
let word_level_position_docids = env.create_database(Some("word-level-position-docids"))?;
let word_prefix_level_position_docids = env.create_database(Some("word-prefix-level-position-docids"))?;
let facet_field_id_value_docids = env.create_database(Some("facet-field-id-value-docids"))?;
let field_id_docid_facet_values = env.create_database(Some("field-id-docid-facet-values"))?;
let documents = env.create_database(Some("documents"))?;
@ -98,6 +101,7 @@ impl Index {
word_pair_proximity_docids,
word_prefix_pair_proximity_docids,
word_level_position_docids,
word_prefix_level_position_docids,
facet_field_id_value_docids,
field_id_docid_facet_values,
documents,

View File

@ -29,6 +29,7 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> {
word_pair_proximity_docids,
word_prefix_pair_proximity_docids,
word_level_position_docids,
word_prefix_level_position_docids,
facet_field_id_value_docids,
field_id_docid_facet_values,
documents,
@ -57,6 +58,7 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> {
word_pair_proximity_docids.clear(self.wtxn)?;
word_prefix_pair_proximity_docids.clear(self.wtxn)?;
word_level_position_docids.clear(self.wtxn)?;
word_prefix_level_position_docids.clear(self.wtxn)?;
facet_field_id_value_docids.clear(self.wtxn)?;
field_id_docid_facet_values.clear(self.wtxn)?;
documents.clear(self.wtxn)?;

View File

@ -89,6 +89,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
word_pair_proximity_docids,
word_prefix_pair_proximity_docids,
word_level_position_docids,
word_prefix_level_position_docids,
facet_field_id_value_docids,
field_id_docid_facet_values,
documents,
@ -345,6 +346,21 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
drop(iter);
// We delete the documents ids that are under the word prefix level position docids.
let mut iter = word_prefix_level_position_docids.iter_mut(self.wtxn)?.remap_key_type::<ByteSlice>();
while let Some(result) = iter.next() {
let (bytes, mut docids) = result?;
let previous_len = docids.len();
docids.difference_with(&self.documents_ids);
if docids.is_empty() {
iter.del_current()?;
} else if docids.len() != previous_len {
iter.put_current(bytes, &docids)?;
}
}
drop(iter);
Ok(self.documents_ids.len())
}
}

View File

@ -52,6 +52,10 @@ pub fn words_pairs_proximities_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -
cbo_roaring_bitmap_merge(values)
}
pub fn word_prefix_level_positions_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
cbo_roaring_bitmap_merge(values)
}
pub fn word_level_position_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
cbo_roaring_bitmap_merge(values)
}

View File

@ -3,6 +3,7 @@ use std::collections::HashSet;
use std::fs::File;
use std::io::{self, Seek, SeekFrom};
use std::num::{NonZeroU32, NonZeroUsize};
use std::str;
use std::sync::mpsc::sync_channel;
use std::time::Instant;
@ -13,18 +14,21 @@ use grenad::{MergerIter, Writer, Sorter, Merger, Reader, FileFuse, CompressionTy
use heed::types::ByteSlice;
use log::{debug, info, error};
use memmap::Mmap;
use rayon::ThreadPool;
use rayon::prelude::*;
use rayon::ThreadPool;
use serde::{Serialize, Deserialize};
use crate::index::Index;
use crate::update::{Facets, WordsLevelPositions, WordsPrefixes, UpdateIndexingStep};
use crate::update::{
Facets, WordsLevelPositions, WordPrefixDocids, WordsPrefixesFst, UpdateIndexingStep,
WordPrefixPairProximityDocids,
};
use self::store::{Store, Readers};
pub use self::merge_function::{
main_merge, word_docids_merge, words_pairs_proximities_docids_merge,
docid_word_positions_merge, documents_merge,
word_level_position_docids_merge, facet_field_value_docids_merge,
field_id_docid_facet_values_merge,
word_level_position_docids_merge, word_prefix_level_positions_docids_merge,
facet_field_value_docids_merge, field_id_docid_facet_values_merge,
};
pub use self::transform::{Transform, TransformOutput};
@ -719,10 +723,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
builder.execute()?;
// Run the words prefixes update operation.
let mut builder = WordsPrefixes::new(self.wtxn, self.index, self.update_id);
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size;
let mut builder = WordsPrefixesFst::new(self.wtxn, self.index, self.update_id);
if let Some(value) = self.words_prefix_threshold {
builder.threshold(value);
}
@ -731,8 +732,26 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
}
builder.execute()?;
// Run the word prefix docids update operation.
let mut builder = WordPrefixDocids::new(self.wtxn, self.index);
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size;
builder.max_nb_chunks = self.max_nb_chunks;
builder.max_memory = self.max_memory;
builder.execute()?;
// Run the word prefix pair proximity docids update operation.
let mut builder = WordPrefixPairProximityDocids::new(self.wtxn, self.index);
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size;
builder.max_nb_chunks = self.max_nb_chunks;
builder.max_memory = self.max_memory;
builder.execute()?;
// Run the words level positions update operation.
let mut builder = WordsLevelPositions::new(self.wtxn, self.index, self.update_id);
let mut builder = WordsLevelPositions::new(self.wtxn, self.index);
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size;

View File

@ -6,8 +6,10 @@ pub use self::index_documents::{DocumentAdditionResult, IndexDocuments, IndexDoc
pub use self::settings::{Setting, Settings};
pub use self::update_builder::UpdateBuilder;
pub use self::update_step::UpdateIndexingStep;
pub use self::word_prefix_docids::WordPrefixDocids;
pub use self::word_prefix_pair_proximity_docids::WordPrefixPairProximityDocids;
pub use self::words_level_positions::WordsLevelPositions;
pub use self::words_prefixes::WordsPrefixes;
pub use self::words_prefixes_fst::WordsPrefixesFst;
mod available_documents_ids;
mod clear_documents;
@ -17,6 +19,7 @@ mod index_documents;
mod settings;
mod update_builder;
mod update_step;
mod word_prefix_docids;
mod word_prefix_pair_proximity_docids;
mod words_level_positions;
mod words_prefixes;
mod words_prefixes_fst;

View File

@ -2,10 +2,7 @@ use grenad::CompressionType;
use rayon::ThreadPool;
use crate::Index;
use super::{
ClearDocuments, DeleteDocuments, IndexDocuments, Settings,
Facets, WordsPrefixes, WordsLevelPositions,
};
use super::{ClearDocuments, DeleteDocuments, IndexDocuments, Settings, Facets};
pub struct UpdateBuilder<'a> {
pub(crate) log_every_n: Option<usize>,
@ -138,34 +135,4 @@ impl<'a> UpdateBuilder<'a> {
builder
}
pub fn words_prefixes<'t, 'u, 'i>(
self,
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
) -> WordsPrefixes<'t, 'u, 'i>
{
let mut builder = WordsPrefixes::new(wtxn, index, self.update_id);
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size;
builder
}
pub fn words_level_positions<'t, 'u, 'i>(
self,
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
) -> WordsLevelPositions<'t, 'u, 'i>
{
let mut builder = WordsLevelPositions::new(wtxn, index, self.update_id);
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size;
builder
}
}

View File

@ -0,0 +1,75 @@
use std::str;
use crate::Index;
use fst::Streamer;
use grenad::CompressionType;
use heed::types::ByteSlice;
use crate::update::index_documents::WriteMethod;
use crate::update::index_documents::{create_sorter, word_docids_merge, sorter_into_lmdb_database};
pub struct WordPrefixDocids<'t, 'u, 'i> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
pub(crate) chunk_compression_type: CompressionType,
pub(crate) chunk_compression_level: Option<u32>,
pub(crate) chunk_fusing_shrink_size: Option<u64>,
pub(crate) max_nb_chunks: Option<usize>,
pub(crate) max_memory: Option<usize>,
}
impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> WordPrefixDocids<'t, 'u, 'i> {
WordPrefixDocids {
wtxn,
index,
chunk_compression_type: CompressionType::None,
chunk_compression_level: None,
chunk_fusing_shrink_size: None,
max_nb_chunks: None,
max_memory: None,
}
}
pub fn execute(self) -> anyhow::Result<()> {
// Clear the word prefix docids database.
self.index.word_prefix_docids.clear(self.wtxn)?;
let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?;
// It is forbidden to keep a mutable reference into the database
// and write into it at the same time, therefore we write into another file.
let mut prefix_docids_sorter = create_sorter(
word_docids_merge,
self.chunk_compression_type,
self.chunk_compression_level,
self.chunk_fusing_shrink_size,
self.max_nb_chunks,
self.max_memory,
);
// We iterate over all the prefixes and retrieve the corresponding docids.
let mut prefix_stream = prefix_fst.stream();
while let Some(bytes) = prefix_stream.next() {
let prefix = str::from_utf8(bytes)?;
let db = self.index.word_docids.remap_data_type::<ByteSlice>();
for result in db.prefix_iter(self.wtxn, prefix)? {
let (_word, data) = result?;
prefix_docids_sorter.insert(prefix, data)?;
}
}
drop(prefix_fst);
// We finally write the word prefix docids into the LMDB database.
sorter_into_lmdb_database(
self.wtxn,
*self.index.word_prefix_docids.as_polymorph(),
prefix_docids_sorter,
word_docids_merge,
WriteMethod::Append,
)?;
Ok(())
}
}

View File

@ -0,0 +1,89 @@
use std::str;
use fst::automaton::{Automaton, Str};
use fst::{Streamer, IntoStreamer};
use grenad::CompressionType;
use heed::BytesEncode;
use heed::types::ByteSlice;
use log::debug;
use crate::Index;
use crate::heed_codec::StrStrU8Codec;
use crate::update::index_documents::{
WriteMethod, create_sorter, sorter_into_lmdb_database,
words_pairs_proximities_docids_merge,
};
pub struct WordPrefixPairProximityDocids<'t, 'u, 'i> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
pub(crate) chunk_compression_type: CompressionType,
pub(crate) chunk_compression_level: Option<u32>,
pub(crate) chunk_fusing_shrink_size: Option<u64>,
pub(crate) max_nb_chunks: Option<usize>,
pub(crate) max_memory: Option<usize>,
}
impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> {
pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
) -> WordPrefixPairProximityDocids<'t, 'u, 'i>
{
WordPrefixPairProximityDocids {
wtxn,
index,
chunk_compression_type: CompressionType::None,
chunk_compression_level: None,
chunk_fusing_shrink_size: None,
max_nb_chunks: None,
max_memory: None,
}
}
pub fn execute(self) -> anyhow::Result<()> {
debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk...");
self.index.word_prefix_pair_proximity_docids.clear(self.wtxn)?;
let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?;
// Here we create a sorter akin to the previous one.
let mut word_prefix_pair_proximity_docids_sorter = create_sorter(
words_pairs_proximities_docids_merge,
self.chunk_compression_type,
self.chunk_compression_level,
self.chunk_fusing_shrink_size,
self.max_nb_chunks,
self.max_memory,
);
// We insert all the word pairs corresponding to the word-prefix pairs
// where the prefixes appears in the prefix FST previously constructed.
let db = self.index.word_pair_proximity_docids.remap_data_type::<ByteSlice>();
for result in db.iter(self.wtxn)? {
let ((word1, word2, prox), data) = result?;
let automaton = Str::new(word2).starts_with();
let mut matching_prefixes = prefix_fst.search(automaton).into_stream();
while let Some(prefix) = matching_prefixes.next() {
let prefix = str::from_utf8(prefix)?;
let pair = (word1, prefix, prox);
let bytes = StrStrU8Codec::bytes_encode(&pair).unwrap();
word_prefix_pair_proximity_docids_sorter.insert(bytes, data)?;
}
}
drop(prefix_fst);
// We finally write the word prefix pair proximity docids into the LMDB database.
sorter_into_lmdb_database(
self.wtxn,
*self.index.word_prefix_pair_proximity_docids.as_polymorph(),
word_prefix_pair_proximity_docids_sorter,
words_pairs_proximities_docids_merge,
WriteMethod::Append,
)?;
Ok(())
}
}

View File

@ -1,17 +1,22 @@
use std::cmp;
use std::{cmp, str};
use std::convert::TryFrom;
use std::fs::File;
use std::num::NonZeroU32;
use fst::automaton::{self, Automaton};
use fst::{Streamer, IntoStreamer};
use grenad::{CompressionType, Reader, Writer, FileFuse};
use heed::types::{DecodeIgnore, Str};
use heed::types::{ByteSlice, DecodeIgnore, Str};
use heed::{BytesEncode, Error};
use log::debug;
use roaring::RoaringBitmap;
use crate::heed_codec::{StrLevelPositionCodec, CboRoaringBitmapCodec};
use crate::update::index_documents::WriteMethod;
use crate::update::index_documents::{create_writer, writer_into_reader, write_into_lmdb_database};
use crate::update::index_documents::{
create_writer, create_sorter, writer_into_reader, write_into_lmdb_database,
word_prefix_level_positions_docids_merge, sorter_into_lmdb_database
};
use crate::{Index, TreeLevel};
pub struct WordsLevelPositions<'t, 'u, 'i> {
@ -20,27 +25,24 @@ pub struct WordsLevelPositions<'t, 'u, 'i> {
pub(crate) chunk_compression_type: CompressionType,
pub(crate) chunk_compression_level: Option<u32>,
pub(crate) chunk_fusing_shrink_size: Option<u64>,
pub(crate) max_nb_chunks: Option<usize>,
pub(crate) max_memory: Option<usize>,
level_group_size: NonZeroU32,
min_level_size: NonZeroU32,
_update_id: u64,
}
impl<'t, 'u, 'i> WordsLevelPositions<'t, 'u, 'i> {
pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
update_id: u64,
) -> WordsLevelPositions<'t, 'u, 'i>
{
pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> WordsLevelPositions<'t, 'u, 'i> {
WordsLevelPositions {
wtxn,
index,
chunk_compression_type: CompressionType::None,
chunk_compression_level: None,
chunk_fusing_shrink_size: None,
max_nb_chunks: None,
max_memory: None,
level_group_size: NonZeroU32::new(4).unwrap(),
min_level_size: NonZeroU32::new(5).unwrap(),
_update_id: update_id,
}
}
@ -76,7 +78,71 @@ impl<'t, 'u, 'i> WordsLevelPositions<'t, 'u, 'i> {
self.wtxn,
*self.index.word_level_position_docids.as_polymorph(),
entries,
|_, _| anyhow::bail!("invalid facet level merging"),
|_, _| anyhow::bail!("invalid word level position merging"),
WriteMethod::Append,
)?;
// We compute the word prefix level positions database.
self.index.word_prefix_level_position_docids.clear(self.wtxn)?;
let mut word_prefix_level_positions_docids_sorter = create_sorter(
word_prefix_level_positions_docids_merge,
self.chunk_compression_type,
self.chunk_compression_level,
self.chunk_fusing_shrink_size,
self.max_nb_chunks,
self.max_memory,
);
// We insert the word prefix level positions where the level is equal to 0 and
// corresponds to the word-prefix level positions where the prefixes appears
// in the prefix FST previously constructed.
let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?;
let db = self.index.word_level_position_docids.remap_data_type::<ByteSlice>();
for result in db.iter(self.wtxn)? {
let ((word, level, left, right), data) = result?;
if level == TreeLevel::min_value() {
let automaton = automaton::Str::new(word).starts_with();
let mut matching_prefixes = prefix_fst.search(automaton).into_stream();
while let Some(prefix) = matching_prefixes.next() {
let prefix = str::from_utf8(prefix)?;
let key = (prefix, level, left, right);
let bytes = StrLevelPositionCodec::bytes_encode(&key).unwrap();
word_prefix_level_positions_docids_sorter.insert(bytes, data)?;
}
}
}
// We finally write all the word prefix level positions docids with
// a level equal to 0 into the LMDB database.
sorter_into_lmdb_database(
self.wtxn,
*self.index.word_prefix_level_position_docids.as_polymorph(),
word_prefix_level_positions_docids_sorter,
word_prefix_level_positions_docids_merge,
WriteMethod::Append,
)?;
let entries = compute_positions_levels(
self.wtxn,
self.index.word_prefix_docids.remap_data_type::<DecodeIgnore>(),
self.index.word_prefix_level_position_docids,
self.chunk_compression_type,
self.chunk_compression_level,
self.chunk_fusing_shrink_size,
self.level_group_size,
self.min_level_size,
)?;
// The previously computed entries also defines the level 0 entries
// so we can clear the database and append all of these entries.
self.index.word_prefix_level_position_docids.clear(self.wtxn)?;
write_into_lmdb_database(
self.wtxn,
*self.index.word_prefix_level_position_docids.as_polymorph(),
entries,
|_, _| anyhow::bail!("invalid word prefix level position merging"),
WriteMethod::Append,
)?;

View File

@ -1,196 +0,0 @@
use std::iter::FromIterator;
use std::str;
use chrono::Utc;
use fst::automaton::Str;
use fst::{Automaton, Streamer, IntoStreamer};
use grenad::CompressionType;
use heed::BytesEncode;
use heed::types::ByteSlice;
use crate::heed_codec::StrStrU8Codec;
use crate::update::index_documents::WriteMethod;
use crate::update::index_documents::{create_sorter, sorter_into_lmdb_database};
use crate::update::index_documents::{word_docids_merge, words_pairs_proximities_docids_merge};
use crate::{Index, SmallString32};
pub struct WordsPrefixes<'t, 'u, 'i> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
pub(crate) chunk_compression_type: CompressionType,
pub(crate) chunk_compression_level: Option<u32>,
pub(crate) chunk_fusing_shrink_size: Option<u64>,
pub(crate) max_nb_chunks: Option<usize>,
pub(crate) max_memory: Option<usize>,
threshold: f64,
max_prefix_length: usize,
_update_id: u64,
}
impl<'t, 'u, 'i> WordsPrefixes<'t, 'u, 'i> {
pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
update_id: u64,
) -> WordsPrefixes<'t, 'u, 'i>
{
WordsPrefixes {
wtxn,
index,
chunk_compression_type: CompressionType::None,
chunk_compression_level: None,
chunk_fusing_shrink_size: None,
max_nb_chunks: None,
max_memory: None,
threshold: 0.1 / 100.0, // .01%
max_prefix_length: 4,
_update_id: update_id,
}
}
/// Set the ratio of concerned words required to make a prefix be part of the words prefixes
/// database. If a word prefix is supposed to match more than this number of words in the
/// dictionnary, therefore this prefix is added to the words prefixes datastructures.
///
/// Default value is `0.01` or `1%`. This value must be between 0 and 1 and will be clamped
/// to these bounds otherwise.
pub fn threshold(&mut self, value: f64) -> &mut Self {
self.threshold = value.min(1.0).max(0.0); // clamp [0, 1]
self
}
/// Set the maximum length of prefixes in bytes.
///
/// Default value is `4` bytes. This value must be between 1 and 25 will be clamped
/// to these bounds, otherwise.
pub fn max_prefix_length(&mut self, value: usize) -> &mut Self {
self.max_prefix_length = value.min(25).max(1); // clamp [1, 25]
self
}
pub fn execute(self) -> anyhow::Result<()> {
self.index.set_updated_at(self.wtxn, &Utc::now())?;
// Clear the words prefixes datastructures.
self.index.word_prefix_docids.clear(self.wtxn)?;
self.index.word_prefix_pair_proximity_docids.clear(self.wtxn)?;
let words_fst = self.index.words_fst(&self.wtxn)?;
let number_of_words = words_fst.len();
let min_number_of_words = (number_of_words as f64 * self.threshold) as usize;
// It is forbidden to keep a mutable reference into the database
// and write into it at the same time, therefore we write into another file.
let mut prefix_docids_sorter = create_sorter(
word_docids_merge,
self.chunk_compression_type,
self.chunk_compression_level,
self.chunk_fusing_shrink_size,
self.max_nb_chunks,
self.max_memory,
);
let mut prefix_fsts = Vec::with_capacity(self.max_prefix_length);
for n in 1..=self.max_prefix_length {
let mut current_prefix = SmallString32::new();
let mut current_prefix_count = 0;
let mut builder = fst::SetBuilder::memory();
let mut stream = words_fst.stream();
while let Some(bytes) = stream.next() {
// We try to get the first n bytes out of this string but we only want
// to split at valid characters bounds. If we try to split in the middle of
// a character we ignore this word and go to the next one.
let word = str::from_utf8(bytes)?;
let prefix = match word.get(..n) {
Some(prefix) => prefix,
None => continue,
};
// This is the first iteration of the loop,
// or the current word doesn't starts with the current prefix.
if current_prefix_count == 0 || prefix != current_prefix.as_str() {
current_prefix = SmallString32::from(prefix);
current_prefix_count = 0;
}
current_prefix_count += 1;
// There is enough words corresponding to this prefix to add it to the cache.
if current_prefix_count == min_number_of_words {
builder.insert(prefix)?;
}
}
// We construct the final set for prefixes of size n.
prefix_fsts.push(builder.into_set());
}
// We merge all of the previously computed prefixes into on final set.
let op = fst::set::OpBuilder::from_iter(prefix_fsts.iter());
let mut builder = fst::SetBuilder::memory();
builder.extend_stream(op.r#union())?;
let prefix_fst = builder.into_set();
// We iterate over all the prefixes and retrieve the corresponding docids.
let mut prefix_stream = prefix_fst.stream();
while let Some(bytes) = prefix_stream.next() {
let prefix = str::from_utf8(bytes)?;
let db = self.index.word_docids.remap_data_type::<ByteSlice>();
for result in db.prefix_iter(self.wtxn, prefix)? {
let (_word, data) = result?;
prefix_docids_sorter.insert(prefix, data)?;
}
}
// Set the words prefixes FST in the dtabase.
self.index.put_words_prefixes_fst(self.wtxn, &prefix_fst)?;
// We finally write the word prefix docids into the LMDB database.
sorter_into_lmdb_database(
self.wtxn,
*self.index.word_prefix_docids.as_polymorph(),
prefix_docids_sorter,
word_docids_merge,
WriteMethod::Append,
)?;
// We compute the word prefix pair proximity database.
// Here we create a sorter akin to the previous one.
let mut word_prefix_pair_proximity_docids_sorter = create_sorter(
words_pairs_proximities_docids_merge,
self.chunk_compression_type,
self.chunk_compression_level,
self.chunk_fusing_shrink_size,
self.max_nb_chunks,
self.max_memory,
);
// We insert all the word pairs corresponding to the word-prefix pairs
// where the prefixes appears in the prefix FST previously constructed.
let db = self.index.word_pair_proximity_docids.remap_data_type::<ByteSlice>();
for result in db.iter(self.wtxn)? {
let ((word1, word2, prox), data) = result?;
let automaton = Str::new(word2).starts_with();
let mut matching_prefixes = prefix_fst.search(automaton).into_stream();
while let Some(prefix) = matching_prefixes.next() {
let prefix = str::from_utf8(prefix)?;
let pair = (word1, prefix, prox);
let bytes = StrStrU8Codec::bytes_encode(&pair).unwrap();
word_prefix_pair_proximity_docids_sorter.insert(bytes, data)?;
}
}
// We finally write the word prefix pair proximity docids into the LMDB database.
sorter_into_lmdb_database(
self.wtxn,
*self.index.word_prefix_pair_proximity_docids.as_polymorph(),
word_prefix_pair_proximity_docids_sorter,
words_pairs_proximities_docids_merge,
WriteMethod::Append,
)?;
Ok(())
}
}

View File

@ -0,0 +1,104 @@
use std::iter::FromIterator;
use std::str;
use fst::Streamer;
use crate::{Index, SmallString32};
pub struct WordsPrefixesFst<'t, 'u, 'i> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
threshold: f64,
max_prefix_length: usize,
_update_id: u64,
}
impl<'t, 'u, 'i> WordsPrefixesFst<'t, 'u, 'i> {
pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
update_id: u64,
) -> WordsPrefixesFst<'t, 'u, 'i>
{
WordsPrefixesFst {
wtxn,
index,
threshold: 0.1 / 100.0, // .01%
max_prefix_length: 4,
_update_id: update_id,
}
}
/// Set the ratio of concerned words required to make a prefix be part of the words prefixes
/// database. If a word prefix is supposed to match more than this number of words in the
/// dictionnary, therefore this prefix is added to the words prefixes datastructures.
///
/// Default value is `0.01` or `1%`. This value must be between 0 and 1 and will be clamped
/// to these bounds otherwise.
pub fn threshold(&mut self, value: f64) -> &mut Self {
self.threshold = value.min(1.0).max(0.0); // clamp [0, 1]
self
}
/// Set the maximum length of prefixes in bytes.
///
/// Default value is `4` bytes. This value must be between 1 and 25 will be clamped
/// to these bounds, otherwise.
pub fn max_prefix_length(&mut self, value: usize) -> &mut Self {
self.max_prefix_length = value.min(25).max(1); // clamp [1, 25]
self
}
pub fn execute(self) -> anyhow::Result<()> {
let words_fst = self.index.words_fst(&self.wtxn)?;
let number_of_words = words_fst.len();
let min_number_of_words = (number_of_words as f64 * self.threshold) as usize;
let mut prefix_fsts = Vec::with_capacity(self.max_prefix_length);
for n in 1..=self.max_prefix_length {
let mut current_prefix = SmallString32::new();
let mut current_prefix_count = 0;
let mut builder = fst::SetBuilder::memory();
let mut stream = words_fst.stream();
while let Some(bytes) = stream.next() {
// We try to get the first n bytes out of this string but we only want
// to split at valid characters bounds. If we try to split in the middle of
// a character we ignore this word and go to the next one.
let word = str::from_utf8(bytes)?;
let prefix = match word.get(..n) {
Some(prefix) => prefix,
None => continue,
};
// This is the first iteration of the loop,
// or the current word doesn't starts with the current prefix.
if current_prefix_count == 0 || prefix != current_prefix.as_str() {
current_prefix = SmallString32::from(prefix);
current_prefix_count = 0;
}
current_prefix_count += 1;
// There is enough words corresponding to this prefix to add it to the cache.
if current_prefix_count == min_number_of_words {
builder.insert(prefix)?;
}
}
// We construct the final set for prefixes of size n.
prefix_fsts.push(builder.into_set());
}
// We merge all of the previously computed prefixes into on final set.
let op = fst::set::OpBuilder::from_iter(prefix_fsts.iter());
let mut builder = fst::SetBuilder::memory();
builder.extend_stream(op.r#union())?;
let prefix_fst = builder.into_set();
// Set the words prefixes FST in the dtabase.
self.index.put_words_prefixes_fst(self.wtxn, &prefix_fst)?;
Ok(())
}
}