Merge pull request #79 from meilisearch/prefix-caches

Introduce prefix databases
This commit is contained in:
Clément Renault 2021-02-17 11:27:15 +01:00 committed by GitHub
commit 85c3d8aa52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 499 additions and 49 deletions

View File

@ -16,23 +16,29 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
const MAIN_DB_NAME: &str = "main";
const WORD_DOCIDS_DB_NAME: &str = "word-docids";
const WORD_PREFIX_DOCIDS_DB_NAME: &str = "word-prefix-docids";
const DOCID_WORD_POSITIONS_DB_NAME: &str = "docid-word-positions";
const WORD_PAIR_PROXIMITY_DOCIDS_DB_NAME: &str = "word-pair-proximity-docids";
const WORD_PREFIX_PAIR_PROXIMITY_DOCIDS_DB_NAME: &str = "word-prefix-pair-proximity-docids";
const DOCUMENTS_DB_NAME: &str = "documents";
const USERS_IDS_DOCUMENTS_IDS: &[u8] = b"users-ids-documents-ids";
const ALL_DATABASE_NAMES: &[&str] = &[
MAIN_DB_NAME,
WORD_DOCIDS_DB_NAME,
WORD_PREFIX_DOCIDS_DB_NAME,
DOCID_WORD_POSITIONS_DB_NAME,
WORD_PAIR_PROXIMITY_DOCIDS_DB_NAME,
WORD_PREFIX_PAIR_PROXIMITY_DOCIDS_DB_NAME,
DOCUMENTS_DB_NAME,
];
const POSTINGS_DATABASE_NAMES: &[&str] = &[
WORD_DOCIDS_DB_NAME,
WORD_PREFIX_DOCIDS_DB_NAME,
DOCID_WORD_POSITIONS_DB_NAME,
WORD_PAIR_PROXIMITY_DOCIDS_DB_NAME,
WORD_PREFIX_PAIR_PROXIMITY_DOCIDS_DB_NAME,
];
#[derive(Debug, StructOpt)]
@ -85,6 +91,16 @@ enum Command {
words: Vec<String>,
},
/// Outputs a CSV with the documents ids where the given words prefixes appears.
WordsPrefixesDocids {
/// Display the whole documents ids in details.
#[structopt(long)]
full_display: bool,
/// The prefixes to display the documents ids of.
prefixes: Vec<String>,
},
/// Outputs a CSV with the documents ids along with the facet values where it appears.
FacetValuesDocids {
/// Display the whole documents ids in details.
@ -147,6 +163,12 @@ enum Command {
/// you can install it using `cargo install fst-bin`.
ExportWordsFst,
/// Outputs the words prefix FST to standard output.
///
/// One can use the FST binary helper to dissect and analyze it,
/// you can install it using `cargo install fst-bin`.
ExportWordsPrefixFst,
/// Outputs the documents as JSON lines to the standard output.
///
/// All of the fields are extracted, not just the displayed ones.
@ -186,6 +208,9 @@ fn run(opt: Opt) -> anyhow::Result<()> {
MostCommonWords { limit } => most_common_words(&index, &rtxn, limit),
BiggestValues { limit } => biggest_value_sizes(&index, &rtxn, limit),
WordsDocids { full_display, words } => words_docids(&index, &rtxn, !full_display, words),
WordsPrefixesDocids { full_display, prefixes } => {
words_prefixes_docids(&index, &rtxn, !full_display, prefixes)
},
FacetValuesDocids { full_display, field_name } => {
facet_values_docids(&index, &rtxn, !full_display, field_name)
},
@ -201,6 +226,7 @@ fn run(opt: Opt) -> anyhow::Result<()> {
word_pair_proximities_docids(&index, &rtxn, !full_display, word1, word2)
},
ExportWordsFst => export_words_fst(&index, &rtxn),
ExportWordsPrefixFst => export_words_prefix_fst(&index, &rtxn),
ExportDocuments => export_documents(&index, &rtxn),
PatchToNewExternalIds => {
drop(rtxn);
@ -311,8 +337,10 @@ fn biggest_value_sizes(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyho
env: _env,
main,
word_docids,
word_prefix_docids,
docid_word_positions,
word_pair_proximity_docids,
word_prefix_pair_proximity_docids,
facet_field_id_value_docids,
field_id_docid_facet_values: _,
documents,
@ -320,7 +348,9 @@ fn biggest_value_sizes(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyho
let main_name = "main";
let word_docids_name = "word_docids";
let word_prefix_docids_name = "word_prefix_docids";
let docid_word_positions_name = "docid_word_positions";
let word_prefix_pair_proximity_docids_name = "word_prefix_pair_proximity_docids";
let word_pair_proximity_docids_name = "word_pair_proximity_docids";
let facet_field_id_value_docids_name = "facet_field_id_value_docids";
let documents_name = "documents";
@ -328,8 +358,16 @@ fn biggest_value_sizes(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyho
let mut heap = BinaryHeap::with_capacity(limit + 1);
if limit > 0 {
// Fetch the words FST
let words_fst = index.words_fst(rtxn)?;
heap.push(Reverse((words_fst.as_fst().as_bytes().len(), format!("words-fst"), main_name)));
let length = words_fst.as_fst().as_bytes().len();
heap.push(Reverse((length, format!("words-fst"), main_name)));
if heap.len() > limit { heap.pop(); }
// Fetch the word prefix FST
let words_prefixes_fst = index.words_prefixes_fst(rtxn)?;
let length = words_prefixes_fst.as_fst().as_bytes().len();
heap.push(Reverse((length, format!("words-prefixes-fst"), main_name)));
if heap.len() > limit { heap.pop(); }
if let Some(documents_ids) = main.get::<_, Str, ByteSlice>(rtxn, "documents-ids")? {
@ -343,6 +381,12 @@ fn biggest_value_sizes(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyho
if heap.len() > limit { heap.pop(); }
}
for result in word_prefix_docids.remap_data_type::<ByteSlice>().iter(rtxn)? {
let (word, value) = result?;
heap.push(Reverse((value.len(), word.to_string(), word_prefix_docids_name)));
if heap.len() > limit { heap.pop(); }
}
for result in docid_word_positions.remap_data_type::<ByteSlice>().iter(rtxn)? {
let ((docid, word), value) = result?;
let key = format!("{} {}", docid, word);
@ -357,6 +401,13 @@ fn biggest_value_sizes(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyho
if heap.len() > limit { heap.pop(); }
}
for result in word_prefix_pair_proximity_docids.remap_data_type::<ByteSlice>().iter(rtxn)? {
let ((word, prefix, prox), value) = result?;
let key = format!("{} {} {}", word, prefix, prox);
heap.push(Reverse((value.len(), key, word_prefix_pair_proximity_docids_name)));
if heap.len() > limit { heap.pop(); }
}
let faceted_fields = index.faceted_fields_ids(rtxn)?;
let fields_ids_map = index.fields_ids_map(rtxn)?;
for (field_id, field_type) in faceted_fields {
@ -426,6 +477,43 @@ fn words_docids(index: &Index, rtxn: &heed::RoTxn, debug: bool, words: Vec<Strin
Ok(wtr.flush()?)
}
fn words_prefixes_docids(
index: &Index,
rtxn: &heed::RoTxn,
debug: bool,
prefixes: Vec<String>,
) -> anyhow::Result<()>
{
let stdout = io::stdout();
let mut wtr = csv::Writer::from_writer(stdout.lock());
wtr.write_record(&["prefix", "documents_ids"])?;
if prefixes.is_empty() {
for result in index.word_prefix_docids.iter(rtxn)? {
let (prefix, docids) = result?;
let docids = if debug {
format!("{:?}", docids)
} else {
format!("{:?}", docids.iter().collect::<Vec<_>>())
};
wtr.write_record(&[prefix, &docids])?;
}
} else {
for prefix in prefixes {
if let Some(docids) = index.word_prefix_docids.get(rtxn, &prefix)? {
let docids = if debug {
format!("{:?}", docids)
} else {
format!("{:?}", docids.iter().collect::<Vec<_>>())
};
wtr.write_record(&[prefix, docids])?;
}
}
}
Ok(wtr.flush()?)
}
fn facet_values_docids(index: &Index, rtxn: &heed::RoTxn, debug: bool, field_name: String) -> anyhow::Result<()> {
let fields_ids_map = index.fields_ids_map(&rtxn)?;
let faceted_fields = index.faceted_fields_ids(&rtxn)?;
@ -517,6 +605,16 @@ fn export_words_fst(index: &Index, rtxn: &heed::RoTxn) -> anyhow::Result<()> {
Ok(())
}
fn export_words_prefix_fst(index: &Index, rtxn: &heed::RoTxn) -> anyhow::Result<()> {
use std::io::Write as _;
let mut stdout = io::stdout();
let words_prefixes_fst = index.words_prefixes_fst(rtxn)?;
stdout.write_all(words_prefixes_fst.as_fst().as_bytes())?;
Ok(())
}
fn export_documents(index: &Index, rtxn: &heed::RoTxn) -> anyhow::Result<()> {
use std::io::{BufWriter, Write as _};
use milli::obkv_to_json;
@ -627,9 +725,11 @@ fn size_of_database(index: &Index, rtxn: &heed::RoTxn, name: &str) -> anyhow::Re
let database = match name {
MAIN_DB_NAME => &index.main,
WORD_PREFIX_DOCIDS_DB_NAME => index.word_prefix_docids.as_polymorph(),
WORD_DOCIDS_DB_NAME => index.word_docids.as_polymorph(),
DOCID_WORD_POSITIONS_DB_NAME => index.docid_word_positions.as_polymorph(),
WORD_PAIR_PROXIMITY_DOCIDS_DB_NAME => index.word_pair_proximity_docids.as_polymorph(),
WORD_PREFIX_PAIR_PROXIMITY_DOCIDS_DB_NAME => index.word_prefix_pair_proximity_docids.as_polymorph(),
DOCUMENTS_DB_NAME => index.documents.as_polymorph(),
unknown => anyhow::bail!("unknown database {:?}", unknown),
};
@ -675,24 +775,21 @@ fn database_stats(index: &Index, rtxn: &heed::RoTxn, name: &str) -> anyhow::Resu
}
values_length.sort_unstable();
let len = values_length.len();
let median = values_length.len() / 2;
let quartile = values_length.len() / 4;
let percentile = values_length.len() / 100;
let twenty_five_percentile = values_length.get(quartile).unwrap_or(&0);
let fifty_percentile = values_length.get(median).unwrap_or(&0);
let seventy_five_percentile = values_length.get(quartile * 3).unwrap_or(&0);
let ninety_percentile = values_length.get(percentile * 90).unwrap_or(&0);
let ninety_five_percentile = values_length.get(percentile * 95).unwrap_or(&0);
let ninety_nine_percentile = values_length.get(percentile * 99).unwrap_or(&0);
let twenty_five_percentile = values_length.get(len / 4).unwrap_or(&0);
let fifty_percentile = values_length.get(len / 2).unwrap_or(&0);
let seventy_five_percentile = values_length.get(len * 3 / 4).unwrap_or(&0);
let ninety_percentile = values_length.get(len * 90 / 100).unwrap_or(&0);
let ninety_five_percentile = values_length.get(len * 95 / 100).unwrap_or(&0);
let ninety_nine_percentile = values_length.get(len * 99 / 100).unwrap_or(&0);
let minimum = values_length.first().unwrap_or(&0);
let maximum = values_length.last().unwrap_or(&0);
let count = values_length.len();
let sum = values_length.iter().map(|l| *l as u64).sum::<u64>();
println!("The {} database stats on the lengths", name);
println!("\tnumber of proximity pairs: {}", count);
println!("\tnumber of entries: {}", count);
println!("\t25th percentile (first quartile): {}", twenty_five_percentile);
println!("\t50th percentile (median): {}", fifty_percentile);
println!("\t75th percentile (third quartile): {}", seventy_five_percentile);
@ -714,6 +811,10 @@ fn database_stats(index: &Index, rtxn: &heed::RoTxn, name: &str) -> anyhow::Resu
let db = index.word_docids.as_polymorph();
compute_stats::<RoaringBitmapCodec>(*db, rtxn, name)
},
WORD_PREFIX_DOCIDS_DB_NAME => {
let db = index.word_prefix_docids.as_polymorph();
compute_stats::<RoaringBitmapCodec>(*db, rtxn, name)
},
DOCID_WORD_POSITIONS_DB_NAME => {
let db = index.docid_word_positions.as_polymorph();
compute_stats::<BoRoaringBitmapCodec>(*db, rtxn, name)
@ -722,6 +823,10 @@ fn database_stats(index: &Index, rtxn: &heed::RoTxn, name: &str) -> anyhow::Resu
let db = index.word_pair_proximity_docids.as_polymorph();
compute_stats::<CboRoaringBitmapCodec>(*db, rtxn, name)
},
WORD_PREFIX_PAIR_PROXIMITY_DOCIDS_DB_NAME => {
let db = index.word_prefix_pair_proximity_docids.as_polymorph();
compute_stats::<CboRoaringBitmapCodec>(*db, rtxn, name)
},
unknown => anyhow::bail!("unknown database {:?}", unknown),
}
}

View File

@ -27,6 +27,7 @@ pub const SEARCHABLE_FIELDS_KEY: &str = "searchable-fields";
pub const HARD_EXTERNAL_DOCUMENTS_IDS_KEY: &str = "hard-external-documents-ids";
pub const SOFT_EXTERNAL_DOCUMENTS_IDS_KEY: &str = "soft-external-documents-ids";
pub const WORDS_FST_KEY: &str = "words-fst";
pub const WORDS_PREFIXES_FST_KEY: &str = "words-prefixes-fst";
#[derive(Clone)]
pub struct Index {
@ -36,10 +37,14 @@ pub struct Index {
pub main: PolyDatabase,
/// A word and all the documents ids containing the word.
pub word_docids: Database<Str, RoaringBitmapCodec>,
/// A prefix of word and all the documents ids containing this prefix.
pub word_prefix_docids: Database<Str, RoaringBitmapCodec>,
/// Maps a word and a document id (u32) to all the positions where the given word appears.
pub docid_word_positions: Database<BEU32StrCodec, BoRoaringBitmapCodec>,
/// Maps the proximity between a pair of words with all the docids where this relation appears.
pub word_pair_proximity_docids: Database<StrStrU8Codec, CboRoaringBitmapCodec>,
/// Maps the proximity between a pair of word and prefix with all the docids where this relation appears.
pub word_prefix_pair_proximity_docids: Database<StrStrU8Codec, CboRoaringBitmapCodec>,
/// Maps the facet field id and the globally ordered value with the docids that corresponds to it.
pub facet_field_id_value_docids: Database<ByteSlice, CboRoaringBitmapCodec>,
/// Maps the document id, the facet field id and the globally ordered value.
@ -50,13 +55,15 @@ pub struct Index {
impl Index {
pub fn new<P: AsRef<Path>>(mut options: heed::EnvOpenOptions, path: P) -> anyhow::Result<Index> {
options.max_dbs(7);
options.max_dbs(9);
let env = options.open(path)?;
let main = env.create_poly_database(Some("main"))?;
let word_docids = env.create_database(Some("word-docids"))?;
let word_prefix_docids = env.create_database(Some("word-prefix-docids"))?;
let docid_word_positions = env.create_database(Some("docid-word-positions"))?;
let word_pair_proximity_docids = env.create_database(Some("word-pair-proximity-docids"))?;
let word_prefix_pair_proximity_docids = env.create_database(Some("word-prefix-pair-proximity-docids"))?;
let facet_field_id_value_docids = env.create_database(Some("facet-field-id-value-docids"))?;
let field_id_docid_facet_values = env.create_database(Some("field-id-docid-facet-values"))?;
let documents = env.create_database(Some("documents"))?;
@ -65,8 +72,10 @@ impl Index {
env,
main,
word_docids,
word_prefix_docids,
docid_word_positions,
word_pair_proximity_docids,
word_prefix_pair_proximity_docids,
facet_field_id_value_docids,
field_id_docid_facet_values,
documents,
@ -328,6 +337,23 @@ impl Index {
}
}
/* words prefixes fst */
/// Writes the FST which is the words prefixes dictionnary of the engine.
pub fn put_words_prefixes_fst<A: AsRef<[u8]>>(&self, wtxn: &mut RwTxn, fst: &fst::Set<A>) -> heed::Result<()> {
self.main.put::<_, Str, ByteSlice>(wtxn, WORDS_PREFIXES_FST_KEY, fst.as_fst().as_bytes())
}
/// Returns the FST which is the words prefixes dictionnary of the engine.
pub fn words_prefixes_fst<'t>(&self, rtxn: &'t RoTxn) -> anyhow::Result<fst::Set<Cow<'t, [u8]>>> {
match self.main.get::<_, Str, ByteSlice>(rtxn, WORDS_PREFIXES_FST_KEY)? {
Some(bytes) => Ok(fst::Set::new(bytes)?.map_data(Cow::Borrowed)?),
None => Ok(fst::Set::default().map_data(Cow::Owned)?),
}
}
/* documents */
/// Returns a [`Vec`] of the requested documents. Returns an error if a document is missing.
pub fn documents<'t>(
&self,

View File

@ -33,8 +33,8 @@ pub use self::update_store::UpdateStore;
pub type FastMap4<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher32>>;
pub type FastMap8<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher64>>;
pub type SmallString32 = smallstr::SmallString<[u8; 32]>;
pub type SmallVec32<T> = smallvec::SmallVec<[T; 32]>;
pub type SmallVec16<T> = smallvec::SmallVec<[T; 16]>;
pub type SmallVec32<T> = smallvec::SmallVec<[T; 32]>;
pub type SmallVec8<T> = smallvec::SmallVec<[T; 8]>;
pub type BEU32 = heed::zerocopy::U32<heed::byteorder::BE>;
pub type BEU64 = heed::zerocopy::U64<heed::byteorder::BE>;

View File

@ -22,8 +22,10 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> {
env: _env,
main: _main,
word_docids,
word_prefix_docids,
docid_word_positions,
word_pair_proximity_docids,
word_prefix_pair_proximity_docids,
facet_field_id_value_docids,
field_id_docid_facet_values,
documents,
@ -35,6 +37,7 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> {
// We clean some of the main engine datastructures.
self.index.put_words_fst(self.wtxn, &fst::Set::default())?;
self.index.put_words_prefixes_fst(self.wtxn, &fst::Set::default())?;
self.index.put_external_documents_ids(self.wtxn, &ExternalDocumentsIds::default())?;
self.index.put_documents_ids(self.wtxn, &RoaringBitmap::default())?;
@ -45,8 +48,10 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> {
// Clear the other databases.
word_docids.clear(self.wtxn)?;
word_prefix_docids.clear(self.wtxn)?;
docid_word_positions.clear(self.wtxn)?;
word_pair_proximity_docids.clear(self.wtxn)?;
word_prefix_pair_proximity_docids.clear(self.wtxn)?;
facet_field_id_value_docids.clear(self.wtxn)?;
field_id_docid_facet_values.clear(self.wtxn)?;
documents.clear(self.wtxn)?;

View File

@ -79,8 +79,10 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
env: _env,
main: _main,
word_docids,
word_prefix_docids,
docid_word_positions,
word_pair_proximity_docids,
word_prefix_pair_proximity_docids,
facet_field_id_value_docids,
field_id_docid_facet_values,
documents,
@ -179,6 +181,61 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
// We write the new words FST into the main database.
self.index.put_words_fst(self.wtxn, &new_words_fst)?;
// We iterate over the word prefix docids database and remove the deleted documents ids
// from every docids lists. We register the empty prefixes in an fst Set for futur deletion.
let mut prefixes_to_delete = fst::SetBuilder::memory();
let mut iter = word_prefix_docids.iter_mut(self.wtxn)?;
while let Some(result) = iter.next() {
let (prefix, mut docids) = result?;
let previous_len = docids.len();
docids.difference_with(&self.documents_ids);
if docids.is_empty() {
iter.del_current()?;
prefixes_to_delete.insert(prefix)?;
} else if docids.len() != previous_len {
iter.put_current(prefix, &docids)?;
}
}
drop(iter);
// We compute the new prefix FST and write it only if there is a change.
let prefixes_to_delete = prefixes_to_delete.into_set();
if !prefixes_to_delete.is_empty() {
let new_words_prefixes_fst = {
// We retrieve the current words prefixes FST from the database.
let words_prefixes_fst = self.index.words_prefixes_fst(self.wtxn)?;
let difference = words_prefixes_fst.op().add(&prefixes_to_delete).difference();
// We stream the new external ids that does no more contains the to-delete external ids.
let mut new_words_prefixes_fst_builder = fst::SetBuilder::memory();
new_words_prefixes_fst_builder.extend_stream(difference.into_stream())?;
// We create an words FST set from the above builder.
new_words_prefixes_fst_builder.into_set()
};
// We write the new words prefixes FST into the main database.
self.index.put_words_prefixes_fst(self.wtxn, &new_words_prefixes_fst)?;
}
// We delete the documents ids from the word prefix pair proximity database docids
// and remove the empty pairs too.
let db = word_prefix_pair_proximity_docids.remap_key_type::<ByteSlice>();
let mut iter = db.iter_mut(self.wtxn)?;
while let Some(result) = iter.next() {
let (key, mut docids) = result?;
let previous_len = docids.len();
docids.difference_with(&self.documents_ids);
if docids.is_empty() {
iter.del_current()?;
} else if docids.len() != previous_len {
iter.put_current(key, &docids)?;
}
}
drop(iter);
// We delete the documents ids that are under the pairs of words,
// it is faster and use no memory to iterate over all the words pairs than
// to compute the cartesian product of every words of the deleted documents.

View File

@ -32,7 +32,8 @@ impl<'t, 'u, 'i> Facets<'t, 'u, 'i> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
update_id: u64,
) -> Facets<'t, 'u, 'i> {
) -> Facets<'t, 'u, 'i>
{
Facets {
wtxn,
index,

View File

@ -8,7 +8,7 @@ use std::time::Instant;
use anyhow::Context;
use bstr::ByteSlice as _;
use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType};
use grenad::{MergerIter, Writer, Sorter, Merger, Reader, FileFuse, CompressionType};
use heed::types::ByteSlice;
use log::{debug, info, error};
use memmap::Mmap;
@ -17,9 +17,9 @@ use rayon::prelude::*;
use serde::{Serialize, Deserialize};
use crate::index::Index;
use crate::update::{Facets, UpdateIndexingStep};
use crate::update::{Facets, WordsPrefixes, UpdateIndexingStep};
use self::store::{Store, Readers};
use self::merge_function::{
pub use self::merge_function::{
main_merge, word_docids_merge, words_pairs_proximities_docids_merge,
docid_word_positions_merge, documents_merge, facet_field_value_docids_merge,
field_id_docid_facet_values_merge,
@ -102,39 +102,19 @@ pub fn merge_into_lmdb_database(
sources: Vec<Reader<FileFuse>>,
merge: MergeFn,
method: WriteMethod,
) -> anyhow::Result<()> {
) -> anyhow::Result<()>
{
debug!("Merging {} MTBL stores...", sources.len());
let before = Instant::now();
let merger = merge_readers(sources, merge);
let mut in_iter = merger.into_merge_iter()?;
match method {
WriteMethod::Append => {
let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;
while let Some((k, v)) = in_iter.next()? {
out_iter.append(k, v).with_context(|| {
format!("writing {:?} into LMDB", k.as_bstr())
})?;
}
},
WriteMethod::GetMergePut => {
while let Some((k, v)) = in_iter.next()? {
let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?;
match iter.next().transpose()? {
Some((key, old_val)) if key == k => {
let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)];
let val = merge(k, &vals).expect("merge failed");
iter.put_current(k, &val)?;
},
_ => {
drop(iter);
database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?;
},
}
}
},
}
merger_iter_into_lmdb_database(
wtxn,
database,
merger.into_merge_iter()?,
merge,
method,
)?;
debug!("MTBL stores merged in {:.02?}!", before.elapsed());
Ok(())
@ -146,7 +126,8 @@ pub fn write_into_lmdb_database(
mut reader: Reader<FileFuse>,
merge: MergeFn,
method: WriteMethod,
) -> anyhow::Result<()> {
) -> anyhow::Result<()>
{
debug!("Writing MTBL stores...");
let before = Instant::now();
@ -181,6 +162,67 @@ pub fn write_into_lmdb_database(
Ok(())
}
pub fn sorter_into_lmdb_database(
wtxn: &mut heed::RwTxn,
database: heed::PolyDatabase,
sorter: Sorter<MergeFn>,
merge: MergeFn,
method: WriteMethod,
) -> anyhow::Result<()>
{
debug!("Writing MTBL sorter...");
let before = Instant::now();
merger_iter_into_lmdb_database(
wtxn,
database,
sorter.into_iter()?,
merge,
method,
)?;
debug!("MTBL sorter writen in {:.02?}!", before.elapsed());
Ok(())
}
fn merger_iter_into_lmdb_database<R: io::Read>(
wtxn: &mut heed::RwTxn,
database: heed::PolyDatabase,
mut sorter: MergerIter<R, MergeFn>,
merge: MergeFn,
method: WriteMethod,
) -> anyhow::Result<()>
{
match method {
WriteMethod::Append => {
let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;
while let Some((k, v)) = sorter.next()? {
out_iter.append(k, v).with_context(|| {
format!("writing {:?} into LMDB", k.as_bstr())
})?;
}
},
WriteMethod::GetMergePut => {
while let Some((k, v)) = sorter.next()? {
let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?;
match iter.next().transpose()? {
Some((key, old_val)) if key == k => {
let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)];
let val = merge(k, &vals).expect("merge failed");
iter.put_current(k, &val)?;
},
_ => {
drop(iter);
database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?;
},
}
}
},
}
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[non_exhaustive]
pub enum IndexDocumentsMethod {
@ -217,6 +259,8 @@ pub struct IndexDocuments<'t, 'u, 'i, 'a> {
pub(crate) thread_pool: Option<&'a ThreadPool>,
facet_level_group_size: Option<NonZeroUsize>,
facet_min_level_size: Option<NonZeroUsize>,
words_prefix_threshold: Option<f64>,
max_prefix_length: Option<usize>,
update_method: IndexDocumentsMethod,
update_format: UpdateFormat,
autogenerate_docids: bool,
@ -242,6 +286,8 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
thread_pool: None,
facet_level_group_size: None,
facet_min_level_size: None,
words_prefix_threshold: None,
max_prefix_length: None,
update_method: IndexDocumentsMethod::ReplaceDocuments,
update_format: UpdateFormat::Json,
autogenerate_docids: true,
@ -625,6 +671,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
});
}
// Run the facets update operation.
let mut builder = Facets::new(self.wtxn, self.index, self.update_id);
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
@ -637,6 +684,19 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
}
builder.execute()?;
// Run the words prefixes update operation.
let mut builder = WordsPrefixes::new(self.wtxn, self.index, self.update_id);
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size;
if let Some(value) = self.words_prefix_threshold {
builder.threshold(value);
}
if let Some(value) = self.max_prefix_length {
builder.max_prefix_length(value);
}
builder.execute()?;
debug_assert_eq!(database_count, total_databases);
info!("Transform output indexed in {:.02?}", before_indexing.elapsed());

View File

@ -6,12 +6,14 @@ mod index_documents;
mod settings;
mod update_builder;
mod update_step;
mod words_prefixes;
pub use self::available_documents_ids::AvailableDocumentsIds;
pub use self::clear_documents::ClearDocuments;
pub use self::delete_documents::DeleteDocuments;
pub use self::index_documents::{IndexDocuments, IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult};
pub use self::facets::Facets;
pub use self::index_documents::{IndexDocuments, IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult};
pub use self::settings::Settings;
pub use self::update_builder::UpdateBuilder;
pub use self::update_step::UpdateIndexingStep;
pub use self::words_prefixes::WordsPrefixes;

View File

@ -0,0 +1,194 @@
use std::iter::FromIterator;
use std::str;
use fst::automaton::Str;
use fst::{Automaton, Streamer, IntoStreamer};
use grenad::CompressionType;
use heed::BytesEncode;
use heed::types::ByteSlice;
use crate::heed_codec::StrStrU8Codec;
use crate::update::index_documents::WriteMethod;
use crate::update::index_documents::{create_sorter, sorter_into_lmdb_database};
use crate::update::index_documents::{word_docids_merge, words_pairs_proximities_docids_merge};
use crate::{Index, SmallString32};
pub struct WordsPrefixes<'t, 'u, 'i> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
pub(crate) chunk_compression_type: CompressionType,
pub(crate) chunk_compression_level: Option<u32>,
pub(crate) chunk_fusing_shrink_size: Option<u64>,
pub(crate) max_nb_chunks: Option<usize>,
pub(crate) max_memory: Option<usize>,
threshold: f64,
max_prefix_length: usize,
_update_id: u64,
}
impl<'t, 'u, 'i> WordsPrefixes<'t, 'u, 'i> {
pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
update_id: u64,
) -> WordsPrefixes<'t, 'u, 'i>
{
WordsPrefixes {
wtxn,
index,
chunk_compression_type: CompressionType::None,
chunk_compression_level: None,
chunk_fusing_shrink_size: None,
max_nb_chunks: None,
max_memory: None,
threshold: 0.01, // 1%
max_prefix_length: 4,
_update_id: update_id,
}
}
/// Set the ratio of concerned words required to make a prefix be part of the words prefixes
/// database. If a word prefix is supposed to match more than this number of words in the
/// dictionnary, therefore this prefix is added to the words prefixes datastructures.
///
/// Default value is `0.01` or `1%`. This value must be between 0 and 1 and will be clamped
/// to these bounds otherwise.
pub fn threshold(&mut self, value: f64) -> &mut Self {
self.threshold = value.min(1.0).max(0.0); // clamp [0, 1]
self
}
/// Set the maximum length of prefixes in bytes.
///
/// Default value is `4` bytes. This value must be between 1 and 25 will be clamped
/// to these bounds, otherwise.
pub fn max_prefix_length(&mut self, value: usize) -> &mut Self {
self.max_prefix_length = value.min(25).max(1); // clamp [1, 25]
self
}
pub fn execute(self) -> anyhow::Result<()> {
// Clear the words prefixes datastructures.
self.index.word_prefix_docids.clear(self.wtxn)?;
self.index.word_prefix_pair_proximity_docids.clear(self.wtxn)?;
let words_fst = self.index.words_fst(&self.wtxn)?;
let number_of_words = words_fst.len();
let min_number_of_words = (number_of_words as f64 * self.threshold) as usize;
// It is forbidden to keep a mutable reference into the database
// and write into it at the same time, therefore we write into another file.
let mut prefix_docids_sorter = create_sorter(
word_docids_merge,
self.chunk_compression_type,
self.chunk_compression_level,
self.chunk_fusing_shrink_size,
self.max_nb_chunks,
self.max_memory,
);
let mut prefix_fsts = Vec::with_capacity(self.max_prefix_length);
for n in 1..=self.max_prefix_length {
let mut current_prefix = SmallString32::new();
let mut current_prefix_count = 0;
let mut builder = fst::SetBuilder::memory();
let mut stream = words_fst.stream();
while let Some(bytes) = stream.next() {
// We try to get the first n bytes out of this string but we only want
// to split at valid characters bounds. If we try to split in the middle of
// a character we ignore this word and go to the next one.
let word = str::from_utf8(bytes)?;
let prefix = match word.get(..n) {
Some(prefix) => prefix,
None => continue,
};
// This is the first iteration of the loop,
// or the current word doesn't starts with the current prefix.
if current_prefix_count == 0 || prefix != current_prefix.as_str() {
current_prefix = SmallString32::from(prefix);
current_prefix_count = 0;
}
current_prefix_count += 1;
// There is enough words corresponding to this prefix to add it to the cache.
if current_prefix_count == min_number_of_words {
builder.insert(prefix)?;
}
}
// We construct the final set for prefixes of size n.
prefix_fsts.push(builder.into_set());
}
// We merge all of the previously computed prefixes into on final set.
let op = fst::set::OpBuilder::from_iter(prefix_fsts.iter());
let mut builder = fst::SetBuilder::memory();
builder.extend_stream(op.r#union())?;
let prefix_fst = builder.into_set();
// We iterate over all the prefixes and retrieve the corresponding docids.
let mut prefix_stream = prefix_fst.stream();
while let Some(bytes) = prefix_stream.next() {
let prefix = str::from_utf8(bytes)?;
let db = self.index.word_docids.remap_data_type::<ByteSlice>();
for result in db.prefix_iter(self.wtxn, prefix)? {
let (_word, data) = result?;
prefix_docids_sorter.insert(prefix, data)?;
}
}
// Set the words prefixes FST in the dtabase.
self.index.put_words_prefixes_fst(self.wtxn, &prefix_fst)?;
// We finally write the word prefix docids into the LMDB database.
sorter_into_lmdb_database(
self.wtxn,
*self.index.word_prefix_docids.as_polymorph(),
prefix_docids_sorter,
word_docids_merge,
WriteMethod::Append,
)?;
// We compute the word prefix pair proximity database.
// Here we create a sorter akin to the previous one.
let mut word_prefix_pair_proximity_docids_sorter = create_sorter(
words_pairs_proximities_docids_merge,
self.chunk_compression_type,
self.chunk_compression_level,
self.chunk_fusing_shrink_size,
self.max_nb_chunks,
self.max_memory,
);
// We insert all the word pairs corresponding to the word-prefix pairs
// where the prefixes appears in the prefix FST previously constructed.
let db = self.index.word_pair_proximity_docids.remap_data_type::<ByteSlice>();
for result in db.iter(self.wtxn)? {
let ((word1, word2, prox), data) = result?;
let automaton = Str::new(word2).starts_with();
let mut matching_prefixes = prefix_fst.search(automaton).into_stream();
while let Some(prefix) = matching_prefixes.next() {
let prefix = str::from_utf8(prefix)?;
let pair = (word1, prefix, prox);
let bytes = StrStrU8Codec::bytes_encode(&pair).unwrap();
word_prefix_pair_proximity_docids_sorter.insert(bytes, data)?;
}
}
// We finally write the word prefix pair proximity docids into the LMDB database.
sorter_into_lmdb_database(
self.wtxn,
*self.index.word_prefix_pair_proximity_docids.as_polymorph(),
word_prefix_pair_proximity_docids_sorter,
words_pairs_proximities_docids_merge,
WriteMethod::Append,
)?;
Ok(())
}
}