436: Speed up the word prefix databases computation time r=Kerollmops a=Kerollmops

This PR depends on the fixes done in #431 and must be merged after it.

In this PR we will bring the `WordPrefixPairProximityDocids`, `WordPrefixDocids` and, `WordPrefixPositionDocids` update structures to a new era, a better era, where computing the word prefix pair proximities costs much fewer CPU cycles, an era where this update structure can use the, previously computed, set of new word docids from the newly indexed batch of documents.

---

The `WordPrefixPairProximityDocids` is an update structure, which means that it is an object that we feed with some parameters and which modifies the LMDB database of an index when asked for. This structure specifically computes the list of word prefix pair proximities, which correspond to a list of pairs of words associated with a proximity (the distance between both words) where the second word is not a word but a prefix e.g. `s`, `se`, `a`. This word prefix pair proximity is associated with the list of documents ids which contains the pair of words and prefix at the given proximity.

The origin of the performances issue that this struct brings is related to the fact that it starts its job from the beginning, it clears the LMDB database before rewriting everything from scratch, using the other LMDB databases to achieve that. I hope you understand that this is absolutely not an optimized way of doing things.

Co-authored-by: Clément Renault <clement@meilisearch.com>
Co-authored-by: Kerollmops <clement@meilisearch.com>
This commit is contained in:
bors[bot] 2022-02-16 15:41:14 +00:00 committed by GitHub
commit 25123af3b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 572 additions and 259 deletions

View File

@ -15,7 +15,7 @@ either = "1.6.1"
flate2 = "1.0.20"
fst = "0.4.5"
fxhash = "0.2.1"
grenad = { version = "0.3.1", default-features = false, features = ["tempfile"] }
grenad = { version = "0.4.1", default-features = false, features = ["tempfile"] }
geoutils = "0.4.1"
heed = { git = "https://github.com/Kerollmops/heed", tag = "v0.12.1", default-features = false, features = ["lmdb", "sync-read-txn"] }
human_format = "1.0.3"

View File

@ -29,6 +29,7 @@ pub enum InternalError {
FieldIdMapMissingEntry(FieldIdMapMissingEntry),
Fst(fst::Error),
GrenadInvalidCompressionType,
GrenadInvalidFormatVersion,
IndexingMergingKeys { process: &'static str },
InvalidDatabaseTyping,
RayonThreadPool(ThreadPoolBuildError),
@ -97,6 +98,9 @@ where
grenad::Error::InvalidCompressionType => {
Error::InternalError(InternalError::GrenadInvalidCompressionType)
}
grenad::Error::InvalidFormatVersion => {
Error::InternalError(InternalError::GrenadInvalidFormatVersion)
}
}
}
}
@ -186,6 +190,9 @@ impl fmt::Display for InternalError {
Self::GrenadInvalidCompressionType => {
f.write_str("Invalid compression type have been specified to grenad.")
}
Self::GrenadInvalidFormatVersion => {
f.write_str("Invalid grenad file with an invalid version format.")
}
Self::IndexingMergingKeys { process } => {
write!(f, "Invalid merge while processing {}.", process)
}

View File

@ -15,9 +15,7 @@ use crate::heed_codec::facet::{
FacetStringLevelZeroValueCodec, FacetStringZeroBoundsValueCodec,
};
use crate::heed_codec::CboRoaringBitmapCodec;
use crate::update::index_documents::{
create_writer, write_into_lmdb_database, writer_into_reader, WriteMethod,
};
use crate::update::index_documents::{create_writer, write_into_lmdb_database, writer_into_reader};
use crate::{FieldId, Index, Result};
pub struct Facets<'t, 'u, 'i> {
@ -120,7 +118,6 @@ impl<'t, 'u, 'i> Facets<'t, 'u, 'i> {
*self.index.facet_id_f64_docids.as_polymorph(),
facet_number_levels,
|_, _| Err(InternalError::IndexingMergingKeys { process: "facet number levels" })?,
WriteMethod::GetMergePut,
)?;
write_into_lmdb_database(
@ -128,7 +125,6 @@ impl<'t, 'u, 'i> Facets<'t, 'u, 'i> {
*self.index.facet_id_string_docids.as_polymorph(),
facet_string_levels,
|_, _| Err(InternalError::IndexingMergingKeys { process: "facet string levels" })?,
WriteMethod::GetMergePut,
)?;
}
@ -164,8 +160,7 @@ fn compute_facet_number_levels<'t>(
// It is forbidden to keep a cursor and write in a database at the same time with LMDB
// therefore we write the facet levels entries into a grenad file before transfering them.
let mut writer = tempfile::tempfile()
.and_then(|file| create_writer(compression_type, compression_level, file))?;
let mut writer = create_writer(compression_type, compression_level, tempfile::tempfile()?);
let level_0_range = {
let left = (field_id, 0, f64::MIN, f64::MIN);
@ -283,8 +278,7 @@ fn compute_facet_string_levels<'t>(
// It is forbidden to keep a cursor and write in a database at the same time with LMDB
// therefore we write the facet levels entries into a grenad file before transfering them.
let mut writer = tempfile::tempfile()
.and_then(|file| create_writer(compression_type, compression_level, file))?;
let mut writer = create_writer(compression_type, compression_level, tempfile::tempfile()?);
// Groups sizes are always a power of the original level_group_size and therefore a group
// always maps groups of the previous level and never splits previous levels groups in half.

View File

@ -18,8 +18,8 @@ use crate::{absolute_from_relative_position, FieldId, Result, MAX_POSITION_PER_A
/// Returns the generated internal documents ids and a grenad reader
/// with the list of extracted words from the given chunk of documents.
#[logging_timer::time]
pub fn extract_docid_word_positions<R: io::Read>(
mut obkv_documents: grenad::Reader<R>,
pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
obkv_documents: grenad::Reader<R>,
indexer: GrenadParameters,
searchable_fields: &Option<HashSet<FieldId>>,
stop_words: Option<&fst::Set<&[u8]>>,
@ -46,7 +46,8 @@ pub fn extract_docid_word_positions<R: io::Read>(
}
let analyzer = Analyzer::<Vec<u8>>::new(AnalyzerConfig::default());
while let Some((key, value)) = obkv_documents.next()? {
let mut cursor = obkv_documents.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let document_id = key
.try_into()
.map(u32::from_be_bytes)

View File

@ -14,8 +14,8 @@ use crate::Result;
/// Returns a grenad reader with the list of extracted facet numbers and
/// documents ids from the given chunk of docid facet number positions.
#[logging_timer::time]
pub fn extract_facet_number_docids<R: io::Read>(
mut docid_fid_facet_number: grenad::Reader<R>,
pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
docid_fid_facet_number: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
let max_memory = indexer.max_memory_by_thread();
@ -28,7 +28,8 @@ pub fn extract_facet_number_docids<R: io::Read>(
max_memory,
);
while let Some((key_bytes, _)) = docid_fid_facet_number.next()? {
let mut cursor = docid_fid_facet_number.into_cursor()?;
while let Some((key_bytes, _)) = cursor.move_on_next()? {
let (field_id, document_id, number) =
FieldDocIdFacetF64Codec::bytes_decode(key_bytes).unwrap();

View File

@ -16,8 +16,8 @@ use crate::{FieldId, Result};
/// Returns a grenad reader with the list of extracted facet strings and
/// documents ids from the given chunk of docid facet string positions.
#[logging_timer::time]
pub fn extract_facet_string_docids<R: io::Read>(
mut docid_fid_facet_string: grenad::Reader<R>,
pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
docid_fid_facet_string: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
let max_memory = indexer.max_memory_by_thread();
@ -32,7 +32,8 @@ pub fn extract_facet_string_docids<R: io::Read>(
let mut key_buffer = Vec::new();
let mut value_buffer = Vec::new();
while let Some((key, original_value_bytes)) = docid_fid_facet_string.next()? {
let mut cursor = docid_fid_facet_string.into_cursor()?;
while let Some((key, original_value_bytes)) = cursor.move_on_next()? {
let (field_id_bytes, bytes) = try_split_array_at(key).unwrap();
let field_id = FieldId::from_be_bytes(field_id_bytes);
let (document_id_bytes, normalized_value_bytes) = try_split_array_at(bytes).unwrap();

View File

@ -16,8 +16,8 @@ use crate::{DocumentId, FieldId, Result};
/// Returns the generated grenad reader containing the docid the fid and the orginal value as key
/// and the normalized value as value extracted from the given chunk of documents.
#[logging_timer::time]
pub fn extract_fid_docid_facet_values<R: io::Read>(
mut obkv_documents: grenad::Reader<R>,
pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
obkv_documents: grenad::Reader<R>,
indexer: GrenadParameters,
faceted_fields: &HashSet<FieldId>,
) -> Result<(grenad::Reader<File>, grenad::Reader<File>)> {
@ -40,7 +40,8 @@ pub fn extract_fid_docid_facet_values<R: io::Read>(
);
let mut key_buffer = Vec::new();
while let Some((docid_bytes, value)) = obkv_documents.next()? {
let mut cursor = obkv_documents.into_cursor()?;
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
let obkv = obkv::KvReader::new(value);
for (field_id, field_bytes) in obkv.iter() {

View File

@ -18,8 +18,8 @@ use crate::{relative_from_absolute_position, DocumentId, FieldId, Result};
/// Returns a grenad reader with the list of extracted field id word counts
/// and documents ids from the given chunk of docid word positions.
#[logging_timer::time]
pub fn extract_fid_word_count_docids<R: io::Read>(
mut docid_word_positions: grenad::Reader<R>,
pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
let max_memory = indexer.max_memory_by_thread();
@ -36,7 +36,8 @@ pub fn extract_fid_word_count_docids<R: io::Read>(
let mut document_fid_wordcount = HashMap::new();
let mut current_document_id = None;
while let Some((key, value)) = docid_word_positions.next()? {
let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let (document_id_bytes, _word_bytes) = try_split_array_at(key)
.ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
let document_id = u32::from_be_bytes(document_id_bytes);

View File

@ -10,17 +10,20 @@ use crate::{FieldId, InternalError, Result, UserError};
/// Extracts the geographical coordinates contained in each document under the `_geo` field.
///
/// Returns the generated grenad reader containing the docid as key associated to the (latitude, longitude)
pub fn extract_geo_points<R: io::Read>(
mut obkv_documents: grenad::Reader<R>,
pub fn extract_geo_points<R: io::Read + io::Seek>(
obkv_documents: grenad::Reader<R>,
indexer: GrenadParameters,
primary_key_id: FieldId,
geo_field_id: FieldId,
) -> Result<grenad::Reader<File>> {
let mut writer = tempfile::tempfile().and_then(|file| {
create_writer(indexer.chunk_compression_type, indexer.chunk_compression_level, file)
})?;
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
while let Some((docid_bytes, value)) = obkv_documents.next()? {
let mut cursor = obkv_documents.into_cursor()?;
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
let obkv = obkv::KvReader::new(value);
let point: Value = match obkv.get(geo_field_id) {
Some(point) => serde_json::from_slice(point).map_err(InternalError::SerdeJson)?,

View File

@ -17,8 +17,8 @@ use crate::Result;
/// Returns a grenad reader with the list of extracted words and
/// documents ids from the given chunk of docid word positions.
#[logging_timer::time]
pub fn extract_word_docids<R: io::Read>(
mut docid_word_positions: grenad::Reader<R>,
pub fn extract_word_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
let max_memory = indexer.max_memory_by_thread();
@ -32,7 +32,8 @@ pub fn extract_word_docids<R: io::Read>(
);
let mut value_buffer = Vec::new();
while let Some((key, _value)) = docid_word_positions.next()? {
let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, _value)) = cursor.move_on_next()? {
let (document_id_bytes, word_bytes) = try_split_array_at(key)
.ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
let document_id = u32::from_be_bytes(document_id_bytes);

View File

@ -17,8 +17,8 @@ use crate::{DocumentId, Result};
/// Returns a grenad reader with the list of extracted word pairs proximities and
/// documents ids from the given chunk of docid word positions.
#[logging_timer::time]
pub fn extract_word_pair_proximity_docids<R: io::Read>(
mut docid_word_positions: grenad::Reader<R>,
pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
let max_memory = indexer.max_memory_by_thread();
@ -35,7 +35,8 @@ pub fn extract_word_pair_proximity_docids<R: io::Read>(
let mut document_word_positions_heap = BinaryHeap::new();
let mut current_document_id = None;
while let Some((key, value)) = docid_word_positions.next()? {
let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let (document_id_bytes, word_bytes) = try_split_array_at(key)
.ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
let document_id = u32::from_be_bytes(document_id_bytes);

View File

@ -14,8 +14,8 @@ use crate::{DocumentId, Result};
/// Returns a grenad reader with the list of extracted words at positions and
/// documents ids from the given chunk of docid word positions.
#[logging_timer::time]
pub fn extract_word_position_docids<R: io::Read>(
mut docid_word_positions: grenad::Reader<R>,
pub fn extract_word_position_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
let max_memory = indexer.max_memory_by_thread();
@ -29,7 +29,8 @@ pub fn extract_word_position_docids<R: io::Read>(
);
let mut key_buffer = Vec::new();
while let Some((key, value)) = docid_word_positions.next()? {
let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let (document_id_bytes, word_bytes) = try_split_array_at(key)
.ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
let document_id = DocumentId::from_be_bytes(document_id_bytes);

View File

@ -25,7 +25,7 @@ use self::extract_word_docids::extract_word_docids;
use self::extract_word_pair_proximity_docids::extract_word_pair_proximity_docids;
use self::extract_word_position_docids::extract_word_position_docids;
use super::helpers::{
into_clonable_grenad, keep_first_prefix_value_merge_roaring_bitmaps, merge_cbo_roaring_bitmaps,
as_cloneable_grenad, keep_first_prefix_value_merge_roaring_bitmaps, merge_cbo_roaring_bitmaps,
merge_readers, merge_roaring_bitmaps, CursorClonableMmap, GrenadParameters, MergeFn,
};
use super::{helpers, TypedChunk};
@ -184,7 +184,7 @@ fn extract_documents_data(
grenad::Reader<CursorClonableMmap>,
(grenad::Reader<CursorClonableMmap>, grenad::Reader<CursorClonableMmap>),
)> {
let documents_chunk = documents_chunk.and_then(|c| unsafe { into_clonable_grenad(c) })?;
let documents_chunk = documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
let _ = lmdb_writer_sx.send(Ok(TypedChunk::Documents(documents_chunk.clone())));
@ -217,7 +217,7 @@ fn extract_documents_data(
// send docid_word_positions_chunk to DB writer
let docid_word_positions_chunk =
unsafe { into_clonable_grenad(docid_word_positions_chunk)? };
unsafe { as_cloneable_grenad(&docid_word_positions_chunk)? };
let _ = lmdb_writer_sx
.send(Ok(TypedChunk::DocidWordPositions(docid_word_positions_chunk.clone())));
@ -233,7 +233,7 @@ fn extract_documents_data(
// send docid_fid_facet_numbers_chunk to DB writer
let docid_fid_facet_numbers_chunk =
unsafe { into_clonable_grenad(docid_fid_facet_numbers_chunk)? };
unsafe { as_cloneable_grenad(&docid_fid_facet_numbers_chunk)? };
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdDocidFacetNumbers(
docid_fid_facet_numbers_chunk.clone(),
@ -241,7 +241,7 @@ fn extract_documents_data(
// send docid_fid_facet_strings_chunk to DB writer
let docid_fid_facet_strings_chunk =
unsafe { into_clonable_grenad(docid_fid_facet_strings_chunk)? };
unsafe { as_cloneable_grenad(&docid_fid_facet_strings_chunk)? };
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdDocidFacetStrings(
docid_fid_facet_strings_chunk.clone(),

View File

@ -9,7 +9,6 @@ use log::debug;
use super::{ClonableMmap, MergeFn};
use crate::error::InternalError;
use crate::update::index_documents::WriteMethod;
use crate::Result;
pub type CursorClonableMmap = io::Cursor<ClonableMmap>;
@ -18,7 +17,7 @@ pub fn create_writer<R: io::Write>(
typ: grenad::CompressionType,
level: Option<u32>,
file: R,
) -> io::Result<grenad::Writer<R>> {
) -> grenad::Writer<R> {
let mut builder = grenad::Writer::builder();
builder.compression_type(typ);
if let Some(level) = level {
@ -53,10 +52,13 @@ pub fn sorter_into_reader(
sorter: grenad::Sorter<MergeFn>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
let mut writer = tempfile::tempfile().and_then(|file| {
create_writer(indexer.chunk_compression_type, indexer.chunk_compression_level, file)
})?;
sorter.write_into(&mut writer)?;
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
sorter.write_into_stream_writer(&mut writer)?;
Ok(writer_into_reader(writer)?)
}
@ -66,30 +68,35 @@ pub fn writer_into_reader(writer: grenad::Writer<File>) -> Result<grenad::Reader
grenad::Reader::new(file).map_err(Into::into)
}
pub unsafe fn into_clonable_grenad(
reader: grenad::Reader<File>,
pub unsafe fn as_cloneable_grenad(
reader: &grenad::Reader<File>,
) -> Result<grenad::Reader<CursorClonableMmap>> {
let file = reader.into_inner();
let mmap = memmap2::Mmap::map(&file)?;
let file = reader.get_ref();
let mmap = memmap2::Mmap::map(file)?;
let cursor = io::Cursor::new(ClonableMmap::from(mmap));
let reader = grenad::Reader::new(cursor)?;
Ok(reader)
}
pub fn merge_readers<R: io::Read>(
pub fn merge_readers<R: io::Read + io::Seek>(
readers: Vec<grenad::Reader<R>>,
merge_fn: MergeFn,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
let mut merger_builder = grenad::MergerBuilder::new(merge_fn);
merger_builder.extend(readers);
for reader in readers {
merger_builder.push(reader.into_cursor()?);
}
let merger = merger_builder.build();
let mut writer = tempfile::tempfile().and_then(|file| {
create_writer(indexer.chunk_compression_type, indexer.chunk_compression_level, file)
})?;
merger.write_into(&mut writer)?;
let reader = writer_into_reader(writer)?;
Ok(reader)
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
merger.write_into_stream_writer(&mut writer)?;
Ok(writer_into_reader(writer)?)
}
#[derive(Debug, Clone, Copy)]
@ -126,12 +133,13 @@ impl GrenadParameters {
/// The grenad obkv entries are composed of an incremental document id big-endian
/// encoded as the key and an obkv object with an `u8` for the field as the key
/// and a simple UTF-8 encoded string as the value.
pub fn grenad_obkv_into_chunks<R: io::Read>(
mut reader: grenad::Reader<R>,
pub fn grenad_obkv_into_chunks<R: io::Read + io::Seek>(
reader: grenad::Reader<R>,
indexer: GrenadParameters,
documents_chunk_size: usize,
) -> Result<impl Iterator<Item = Result<grenad::Reader<File>>>> {
let mut continue_reading = true;
let mut cursor = reader.into_cursor()?;
let indexer_clone = indexer.clone();
let mut transposer = move || {
@ -140,15 +148,13 @@ pub fn grenad_obkv_into_chunks<R: io::Read>(
}
let mut current_chunk_size = 0u64;
let mut obkv_documents = tempfile::tempfile().and_then(|file| {
create_writer(
indexer_clone.chunk_compression_type,
indexer_clone.chunk_compression_level,
file,
)
})?;
let mut obkv_documents = create_writer(
indexer_clone.chunk_compression_type,
indexer_clone.chunk_compression_level,
tempfile::tempfile()?,
);
while let Some((document_id, obkv)) = reader.next()? {
while let Some((document_id, obkv)) = cursor.move_on_next()? {
obkv_documents.insert(document_id, obkv)?;
current_chunk_size += document_id.len() as u64 + obkv.len() as u64;
@ -167,36 +173,25 @@ pub fn grenad_obkv_into_chunks<R: io::Read>(
pub fn write_into_lmdb_database(
wtxn: &mut heed::RwTxn,
database: heed::PolyDatabase,
mut reader: Reader<File>,
reader: Reader<File>,
merge: MergeFn,
method: WriteMethod,
) -> Result<()> {
debug!("Writing MTBL stores...");
let before = Instant::now();
match method {
WriteMethod::Append => {
let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;
while let Some((k, v)) = reader.next()? {
let mut cursor = reader.into_cursor()?;
while let Some((k, v)) = cursor.move_on_next()? {
let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?;
match iter.next().transpose()? {
Some((key, old_val)) if key == k => {
let vals = &[Cow::Borrowed(old_val), Cow::Borrowed(v)][..];
let val = merge(k, &vals)?;
// safety: we don't keep references from inside the LMDB database.
unsafe { out_iter.append(k, v)? };
unsafe { iter.put_current(k, &val)? };
}
}
WriteMethod::GetMergePut => {
while let Some((k, v)) = reader.next()? {
let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?;
match iter.next().transpose()? {
Some((key, old_val)) if key == k => {
let vals = &[Cow::Borrowed(old_val), Cow::Borrowed(v)][..];
let val = merge(k, &vals)?;
// safety: we don't keep references from inside the LMDB database.
unsafe { iter.put_current(k, &val)? };
}
_ => {
drop(iter);
database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?;
}
}
_ => {
drop(iter);
database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?;
}
}
}
@ -210,50 +205,37 @@ pub fn sorter_into_lmdb_database(
database: heed::PolyDatabase,
sorter: Sorter<MergeFn>,
merge: MergeFn,
method: WriteMethod,
) -> Result<()> {
debug!("Writing MTBL sorter...");
let before = Instant::now();
merger_iter_into_lmdb_database(wtxn, database, sorter.into_merger_iter()?, merge, method)?;
merger_iter_into_lmdb_database(wtxn, database, sorter.into_stream_merger_iter()?, merge)?;
debug!("MTBL sorter writen in {:.02?}!", before.elapsed());
Ok(())
}
fn merger_iter_into_lmdb_database<R: io::Read>(
fn merger_iter_into_lmdb_database<R: io::Read + io::Seek>(
wtxn: &mut heed::RwTxn,
database: heed::PolyDatabase,
mut sorter: MergerIter<R, MergeFn>,
mut merger_iter: MergerIter<R, MergeFn>,
merge: MergeFn,
method: WriteMethod,
) -> Result<()> {
match method {
WriteMethod::Append => {
let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;
while let Some((k, v)) = sorter.next()? {
while let Some((k, v)) = merger_iter.next()? {
let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?;
match iter.next().transpose()? {
Some((key, old_val)) if key == k => {
let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)];
let val = merge(k, &vals).map_err(|_| {
// TODO just wrap this error?
InternalError::IndexingMergingKeys { process: "get-put-merge" }
})?;
// safety: we don't keep references from inside the LMDB database.
unsafe { out_iter.append(k, v)? };
unsafe { iter.put_current(k, &val)? };
}
}
WriteMethod::GetMergePut => {
while let Some((k, v)) = sorter.next()? {
let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?;
match iter.next().transpose()? {
Some((key, old_val)) if key == k => {
let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)];
let val = merge(k, &vals).map_err(|_| {
// TODO just wrap this error?
InternalError::IndexingMergingKeys { process: "get-put-merge" }
})?;
// safety: we don't keep references from inside the LMDB database.
unsafe { iter.put_current(k, &val)? };
}
_ => {
drop(iter);
database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?;
}
}
_ => {
drop(iter);
database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?;
}
}
}

View File

@ -2,11 +2,13 @@ mod clonable_mmap;
mod grenad_helpers;
mod merge_functions;
use std::collections::HashSet;
use std::convert::{TryFrom, TryInto};
pub use clonable_mmap::{ClonableMmap, CursorClonableMmap};
use fst::{IntoStreamer, Streamer};
pub use grenad_helpers::{
create_sorter, create_writer, grenad_obkv_into_chunks, into_clonable_grenad, merge_readers,
as_cloneable_grenad, create_sorter, create_writer, grenad_obkv_into_chunks, merge_readers,
sorter_into_lmdb_database, sorter_into_reader, write_into_lmdb_database, writer_into_reader,
GrenadParameters,
};
@ -43,3 +45,32 @@ where
pub fn read_u32_ne_bytes(bytes: &[u8]) -> impl Iterator<Item = u32> + '_ {
bytes.chunks_exact(4).flat_map(TryInto::try_into).map(u32::from_ne_bytes)
}
/// Converts an fst Stream into an HashSet of Strings.
pub fn fst_stream_into_hashset<'f, I, S>(stream: I) -> HashSet<Vec<u8>>
where
I: for<'a> IntoStreamer<'a, Into = S, Item = &'a [u8]>,
S: 'f + for<'a> Streamer<'a, Item = &'a [u8]>,
{
let mut hashset = HashSet::new();
let mut stream = stream.into_stream();
while let Some(value) = stream.next() {
hashset.insert(value.to_owned());
}
hashset
}
// Converts an fst Stream into a Vec of Strings.
pub fn fst_stream_into_vec<'f, I, S>(stream: I) -> Vec<String>
where
I: for<'a> IntoStreamer<'a, Into = S, Item = &'a [u8]>,
S: 'f + for<'a> Streamer<'a, Item = &'a [u8]>,
{
let mut strings = Vec::new();
let mut stream = stream.into_stream();
while let Some(word) = stream.next() {
let s = std::str::from_utf8(word).unwrap();
strings.push(s.to_owned());
}
strings
}

View File

@ -12,15 +12,18 @@ use crossbeam_channel::{Receiver, Sender};
use log::debug;
use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize};
use slice_group_by::GroupBy;
use typed_chunk::{write_typed_chunk_into_index, TypedChunk};
pub use self::helpers::{
create_sorter, create_writer, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps,
sorter_into_lmdb_database, write_into_lmdb_database, writer_into_reader, MergeFn,
as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset,
fst_stream_into_vec, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps,
sorter_into_lmdb_database, write_into_lmdb_database, writer_into_reader, ClonableMmap, MergeFn,
};
use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput};
use crate::documents::DocumentBatchReader;
pub use crate::update::index_documents::helpers::CursorClonableMmap;
use crate::update::{
self, Facets, IndexerConfig, UpdateIndexingStep, WordPrefixDocids,
WordPrefixPairProximityDocids, WordPrefixPositionDocids, WordsPrefixesFst,
@ -57,12 +60,6 @@ impl Default for IndexDocumentsMethod {
}
}
#[derive(Debug, Copy, Clone)]
pub enum WriteMethod {
Append,
GetMergePut,
}
pub struct IndexDocuments<'t, 'u, 'i, 'a, F> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
@ -282,6 +279,9 @@ where
let index_documents_ids = self.index.documents_ids(self.wtxn)?;
let index_is_empty = index_documents_ids.len() == 0;
let mut final_documents_ids = RoaringBitmap::new();
let mut word_pair_proximity_docids = Vec::new();
let mut word_position_docids = Vec::new();
let mut word_docids = Vec::new();
let mut databases_seen = 0;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
@ -289,9 +289,28 @@ where
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
for typed_chunk in lmdb_writer_rx {
for result in lmdb_writer_rx {
let typed_chunk = match result? {
TypedChunk::WordDocids(chunk) => {
let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? };
word_docids.push(cloneable_chunk);
TypedChunk::WordDocids(chunk)
}
TypedChunk::WordPairProximityDocids(chunk) => {
let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? };
word_pair_proximity_docids.push(cloneable_chunk);
TypedChunk::WordPairProximityDocids(chunk)
}
TypedChunk::WordPositionDocids(chunk) => {
let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? };
word_position_docids.push(cloneable_chunk);
TypedChunk::WordPositionDocids(chunk)
}
otherwise => otherwise,
};
let (docids, is_merged_database) =
write_typed_chunk_into_index(typed_chunk?, &self.index, self.wtxn, index_is_empty)?;
write_typed_chunk_into_index(typed_chunk, &self.index, self.wtxn, index_is_empty)?;
if !docids.is_empty() {
final_documents_ids |= docids;
let documents_seen_count = final_documents_ids.len();
@ -325,13 +344,25 @@ where
let all_documents_ids = index_documents_ids | new_documents_ids | replaced_documents_ids;
self.index.put_documents_ids(self.wtxn, &all_documents_ids)?;
self.execute_prefix_databases()?;
self.execute_prefix_databases(
word_docids,
word_pair_proximity_docids,
word_position_docids,
)?;
Ok(all_documents_ids.len())
}
#[logging_timer::time("IndexDocuments::{}")]
pub fn execute_prefix_databases(self) -> Result<()> {
pub fn execute_prefix_databases(
self,
word_docids: Vec<grenad::Reader<CursorClonableMmap>>,
word_pair_proximity_docids: Vec<grenad::Reader<CursorClonableMmap>>,
word_position_docids: Vec<grenad::Reader<CursorClonableMmap>>,
) -> Result<()>
where
F: Fn(UpdateIndexingStep) + Sync,
{
// Merged databases are already been indexed, we start from this count;
let mut databases_seen = MERGED_DATABASE_COUNT;
@ -353,6 +384,9 @@ where
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
let previous_words_prefixes_fst =
self.index.words_prefixes_fst(self.wtxn)?.map_data(|cow| cow.into_owned())?;
// Run the words prefixes update operation.
let mut builder = WordsPrefixesFst::new(self.wtxn, self.index);
if let Some(value) = self.config.words_prefix_threshold {
@ -363,6 +397,27 @@ where
}
builder.execute()?;
let current_prefix_fst = self.index.words_prefixes_fst(self.wtxn)?;
// We retrieve the common words between the previous and new prefix word fst.
let common_prefix_fst_words = fst_stream_into_vec(
previous_words_prefixes_fst.op().add(&current_prefix_fst).intersection(),
);
let common_prefix_fst_words: Vec<_> = common_prefix_fst_words
.as_slice()
.linear_group_by_key(|x| x.chars().nth(0).unwrap())
.collect();
// We retrieve the newly added words between the previous and new prefix word fst.
let new_prefix_fst_words = fst_stream_into_vec(
current_prefix_fst.op().add(&previous_words_prefixes_fst).difference(),
);
// We compute the set of prefixes that are no more part of the prefix fst.
let del_prefix_fst_words = fst_stream_into_hashset(
previous_words_prefixes_fst.op().add(&current_prefix_fst).difference(),
);
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
@ -375,7 +430,12 @@ where
builder.chunk_compression_level = self.indexer_config.chunk_compression_level;
builder.max_nb_chunks = self.indexer_config.max_nb_chunks;
builder.max_memory = self.indexer_config.max_memory;
builder.execute()?;
builder.execute(
word_docids,
&new_prefix_fst_words,
&common_prefix_fst_words,
&del_prefix_fst_words,
)?;
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
@ -389,7 +449,12 @@ where
builder.chunk_compression_level = self.indexer_config.chunk_compression_level;
builder.max_nb_chunks = self.indexer_config.max_nb_chunks;
builder.max_memory = self.indexer_config.max_memory;
builder.execute()?;
builder.execute(
word_pair_proximity_docids,
&new_prefix_fst_words,
&common_prefix_fst_words,
&del_prefix_fst_words,
)?;
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
@ -409,7 +474,12 @@ where
if let Some(value) = self.config.words_positions_min_level_size {
builder.min_level_size(value);
}
builder.execute()?;
builder.execute(
word_position_docids,
&new_prefix_fst_words,
&common_prefix_fst_words,
&del_prefix_fst_words,
)?;
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {

View File

@ -277,7 +277,7 @@ impl<'a, 'i> Transform<'a, 'i> {
let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids);
// consume sorter, in order to free the internal allocation, before creating a new one.
let mut iter = self.sorter.into_merger_iter()?;
let mut iter = self.sorter.into_stream_merger_iter()?;
// Once we have sort and deduplicated the documents we write them into a final file.
let mut final_sorter = create_sorter(
@ -374,16 +374,15 @@ impl<'a, 'i> Transform<'a, 'i> {
});
// We create a final writer to write the new documents in order from the sorter.
let file = tempfile::tempfile()?;
let mut writer = create_writer(
self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level,
file,
)?;
tempfile::tempfile()?,
);
// Once we have written all the documents into the final sorter, we write the documents
// into this writer, extract the file and reset the seek to be able to read it again.
final_sorter.write_into(&mut writer)?;
final_sorter.write_into_stream_writer(&mut writer)?;
let mut documents_file = writer.into_inner()?;
documents_file.seek(SeekFrom::Start(0))?;
@ -424,12 +423,11 @@ impl<'a, 'i> Transform<'a, 'i> {
let documents_count = documents_ids.len() as usize;
// We create a final writer to write the new documents in order from the sorter.
let file = tempfile::tempfile()?;
let mut writer = create_writer(
self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level,
file,
)?;
tempfile::tempfile()?,
);
let mut obkv_buffer = Vec::new();
for result in self.index.documents.iter(wtxn)? {

View File

@ -1,6 +1,7 @@
use std::borrow::Cow;
use std::convert::TryInto;
use std::fs::File;
use std::io;
use heed::types::ByteSlice;
use heed::{BytesDecode, RwTxn};
@ -11,7 +12,7 @@ use super::helpers::{
CursorClonableMmap,
};
use crate::heed_codec::facet::{decode_prefix_string, encode_prefix_string};
use crate::update::index_documents::helpers::into_clonable_grenad;
use crate::update::index_documents::helpers::as_cloneable_grenad;
use crate::{
lat_lng_to_xyz, BoRoaringBitmapCodec, CboRoaringBitmapCodec, DocumentId, GeoPoint, Index,
Result,
@ -65,8 +66,9 @@ pub(crate) fn write_typed_chunk_into_index(
},
)?;
}
TypedChunk::Documents(mut obkv_documents_iter) => {
while let Some((key, value)) = obkv_documents_iter.next()? {
TypedChunk::Documents(obkv_documents_iter) => {
let mut cursor = obkv_documents_iter.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
index.documents.remap_types::<ByteSlice, ByteSlice>().put(wtxn, key, value)?;
}
}
@ -85,7 +87,7 @@ pub(crate) fn write_typed_chunk_into_index(
return Ok((documents_ids, is_merged_database))
}
TypedChunk::WordDocids(word_docids_iter) => {
let mut word_docids_iter = unsafe { into_clonable_grenad(word_docids_iter) }?;
let word_docids_iter = unsafe { as_cloneable_grenad(&word_docids_iter) }?;
append_entries_into_database(
word_docids_iter.clone(),
&index.word_docids,
@ -97,7 +99,8 @@ pub(crate) fn write_typed_chunk_into_index(
// create fst from word docids
let mut builder = fst::SetBuilder::memory();
while let Some((word, _value)) = word_docids_iter.next()? {
let mut cursor = word_docids_iter.into_cursor()?;
while let Some((word, _value)) = cursor.move_on_next()? {
// This is a lexicographically ordered word position
// we use the key to construct the words fst.
builder.insert(word)?;
@ -146,19 +149,21 @@ pub(crate) fn write_typed_chunk_into_index(
)?;
is_merged_database = true;
}
TypedChunk::FieldIdDocidFacetNumbers(mut fid_docid_facet_number) => {
TypedChunk::FieldIdDocidFacetNumbers(fid_docid_facet_number) => {
let index_fid_docid_facet_numbers =
index.field_id_docid_facet_f64s.remap_types::<ByteSlice, ByteSlice>();
while let Some((key, value)) = fid_docid_facet_number.next()? {
let mut cursor = fid_docid_facet_number.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
if valid_lmdb_key(key) {
index_fid_docid_facet_numbers.put(wtxn, key, &value)?;
}
}
}
TypedChunk::FieldIdDocidFacetStrings(mut fid_docid_facet_string) => {
TypedChunk::FieldIdDocidFacetStrings(fid_docid_facet_string) => {
let index_fid_docid_facet_strings =
index.field_id_docid_facet_strings.remap_types::<ByteSlice, ByteSlice>();
while let Some((key, value)) = fid_docid_facet_string.next()? {
let mut cursor = fid_docid_facet_string.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
if valid_lmdb_key(key) {
index_fid_docid_facet_strings.put(wtxn, key, &value)?;
}
@ -183,11 +188,12 @@ pub(crate) fn write_typed_chunk_into_index(
)?;
is_merged_database = true;
}
TypedChunk::GeoPoints(mut geo_points) => {
TypedChunk::GeoPoints(geo_points) => {
let mut rtree = index.geo_rtree(wtxn)?.unwrap_or_default();
let mut geo_faceted_docids = index.geo_faceted_documents_ids(wtxn)?;
while let Some((key, value)) = geo_points.next()? {
let mut cursor = geo_points.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
// convert the key back to a u32 (4 bytes)
let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap();
@ -229,7 +235,7 @@ fn merge_cbo_roaring_bitmaps(
/// Write provided entries in database using serialize_value function.
/// merge_values function is used if an entry already exist in the database.
fn write_entries_into_database<R, K, V, FS, FM>(
mut data: grenad::Reader<R>,
data: grenad::Reader<R>,
database: &heed::Database<K, V>,
wtxn: &mut RwTxn,
index_is_empty: bool,
@ -237,14 +243,15 @@ fn write_entries_into_database<R, K, V, FS, FM>(
merge_values: FM,
) -> Result<()>
where
R: std::io::Read,
R: io::Read + io::Seek,
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
FM: Fn(&[u8], &[u8], &mut Vec<u8>) -> Result<()>,
{
let mut buffer = Vec::new();
let database = database.remap_types::<ByteSlice, ByteSlice>();
while let Some((key, value)) = data.next()? {
let mut cursor = data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
if valid_lmdb_key(key) {
buffer.clear();
let value = if index_is_empty {
@ -270,7 +277,7 @@ where
/// All provided entries must be ordered.
/// If the index is not empty, write_entries_into_database is called instead.
fn append_entries_into_database<R, K, V, FS, FM>(
mut data: grenad::Reader<R>,
data: grenad::Reader<R>,
database: &heed::Database<K, V>,
wtxn: &mut RwTxn,
index_is_empty: bool,
@ -278,7 +285,7 @@ fn append_entries_into_database<R, K, V, FS, FM>(
merge_values: FM,
) -> Result<()>
where
R: std::io::Read,
R: io::Read + io::Seek,
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
FM: Fn(&[u8], &[u8], &mut Vec<u8>) -> Result<()>,
{
@ -296,7 +303,8 @@ where
let mut buffer = Vec::new();
let mut database = database.iter_mut(wtxn)?.remap_types::<ByteSlice, ByteSlice>();
while let Some((key, value)) = data.next()? {
let mut cursor = data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
if valid_lmdb_key(key) {
buffer.clear();
let value = serialize_value(value, &mut buffer)?;

View File

@ -1,11 +1,10 @@
use std::str;
use std::collections::{HashMap, HashSet};
use fst::Streamer;
use grenad::CompressionType;
use grenad::{CompressionType, MergerBuilder};
use heed::types::ByteSlice;
use crate::update::index_documents::{
create_sorter, merge_roaring_bitmaps, sorter_into_lmdb_database, WriteMethod,
create_sorter, merge_roaring_bitmaps, sorter_into_lmdb_database, CursorClonableMmap, MergeFn,
};
use crate::{Index, Result};
@ -34,12 +33,13 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
}
#[logging_timer::time("WordPrefixDocids::{}")]
pub fn execute(self) -> Result<()> {
// Clear the word prefix docids database.
self.index.word_prefix_docids.clear(self.wtxn)?;
let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?;
pub fn execute(
self,
new_word_docids: Vec<grenad::Reader<CursorClonableMmap>>,
new_prefix_fst_words: &[String],
common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> {
// It is forbidden to keep a mutable reference into the database
// and write into it at the same time, therefore we write into another file.
let mut prefix_docids_sorter = create_sorter(
@ -50,18 +50,60 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
self.max_memory,
);
// We iterate over all the prefixes and retrieve the corresponding docids.
let mut prefix_stream = prefix_fst.stream();
while let Some(bytes) = prefix_stream.next() {
let prefix = str::from_utf8(bytes)?;
let db = self.index.word_docids.remap_data_type::<ByteSlice>();
let mut word_docids_merger = MergerBuilder::new(merge_roaring_bitmaps);
for reader in new_word_docids {
word_docids_merger.push(reader.into_cursor()?);
}
let mut word_docids_iter = word_docids_merger.build().into_stream_merger_iter()?;
let mut current_prefixes: Option<&&[String]> = None;
let mut prefixes_cache = HashMap::new();
while let Some((word, data)) = word_docids_iter.next()? {
current_prefixes = match current_prefixes.take() {
Some(prefixes) if word.starts_with(&prefixes[0].as_bytes()) => Some(prefixes),
_otherwise => {
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?;
common_prefix_fst_words
.iter()
.find(|prefixes| word.starts_with(&prefixes[0].as_bytes()))
}
};
if let Some(prefixes) = current_prefixes {
for prefix in prefixes.iter() {
if word.starts_with(prefix.as_bytes()) {
match prefixes_cache.get_mut(prefix.as_bytes()) {
Some(value) => value.push(data.to_owned()),
None => {
prefixes_cache.insert(prefix.clone().into(), vec![data.to_owned()]);
}
}
}
}
}
}
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?;
// We fetch the docids associated to the newly added word prefix fst only.
let db = self.index.word_docids.remap_data_type::<ByteSlice>();
for prefix in new_prefix_fst_words {
let prefix = std::str::from_utf8(prefix.as_bytes())?;
for result in db.prefix_iter(self.wtxn, prefix)? {
let (_word, data) = result?;
prefix_docids_sorter.insert(prefix, data)?;
}
}
drop(prefix_fst);
// We remove all the entries that are no more required in this word prefix docids database.
let mut iter = self.index.word_prefix_docids.iter_mut(self.wtxn)?.lazily_decode_data();
while let Some((prefix, _)) = iter.next().transpose()? {
if del_prefix_fst_words.contains(prefix.as_bytes()) {
unsafe { iter.del_current()? };
}
}
drop(iter);
// We finally write the word prefix docids into the LMDB database.
sorter_into_lmdb_database(
@ -69,9 +111,21 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
*self.index.word_prefix_docids.as_polymorph(),
prefix_docids_sorter,
merge_roaring_bitmaps,
WriteMethod::Append,
)?;
Ok(())
}
}
fn write_prefixes_in_sorter(
prefixes: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>,
sorter: &mut grenad::Sorter<MergeFn>,
) -> Result<()> {
for (key, data_slices) in prefixes.drain() {
for data in data_slices {
sorter.insert(&key, data)?;
}
}
Ok(())
}

View File

@ -1,15 +1,16 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use fst::IntoStreamer;
use grenad::CompressionType;
use grenad::{CompressionType, MergerBuilder};
use heed::types::ByteSlice;
use heed::BytesDecode;
use log::debug;
use slice_group_by::GroupBy;
use crate::update::index_documents::{
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, MergeFn, WriteMethod,
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, CursorClonableMmap,
MergeFn,
};
use crate::{Index, Result};
use crate::{Index, Result, StrStrU8Codec};
pub struct WordPrefixPairProximityDocids<'t, 'u, 'i> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,
@ -61,12 +62,26 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> {
}
#[logging_timer::time("WordPrefixPairProximityDocids::{}")]
pub fn execute(self) -> Result<()> {
pub fn execute(
self,
new_word_pair_proximity_docids: Vec<grenad::Reader<CursorClonableMmap>>,
new_prefix_fst_words: &[String],
common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> {
debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk...");
self.index.word_prefix_pair_proximity_docids.clear(self.wtxn)?;
let new_prefix_fst_words: Vec<_> =
new_prefix_fst_words.linear_group_by_key(|x| x.chars().nth(0).unwrap()).collect();
// We retrieve and merge the created word pair proximities docids entries
// for the newly added documents.
let mut wppd_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps);
for reader in new_word_pair_proximity_docids {
wppd_merger.push(reader.into_cursor()?);
}
let mut wppd_iter = wppd_merger.build().into_stream_merger_iter()?;
// Here we create a sorter akin to the previous one.
let mut word_prefix_pair_proximity_docids_sorter = create_sorter(
merge_cbo_roaring_bitmaps,
self.chunk_compression_type,
@ -75,52 +90,29 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> {
self.max_memory,
);
let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?;
let prefix_fst_keys = prefix_fst.into_stream().into_strs()?;
let prefix_fst_keys: Vec<_> =
prefix_fst_keys.as_slice().linear_group_by_key(|x| x.chars().nth(0).unwrap()).collect();
let mut db =
self.index.word_pair_proximity_docids.remap_data_type::<ByteSlice>().iter(self.wtxn)?;
// We compute the prefix docids associated with the common prefixes between
// the old and new word prefix fst.
let mut buffer = Vec::new();
let mut current_prefixes: Option<&&[String]> = None;
let mut prefixes_cache = HashMap::new();
while let Some(((w1, w2, prox), data)) = db.next().transpose()? {
while let Some((key, data)) = wppd_iter.next()? {
let (w1, w2, prox) = StrStrU8Codec::bytes_decode(key).ok_or(heed::Error::Decoding)?;
if prox > self.max_proximity {
continue;
}
current_prefixes = match current_prefixes.take() {
Some(prefixes) if w2.starts_with(&prefixes[0]) => Some(prefixes),
_otherwise => {
write_prefixes_in_sorter(
&mut prefixes_cache,
&mut word_prefix_pair_proximity_docids_sorter,
)?;
prefix_fst_keys.iter().find(|prefixes| w2.starts_with(&prefixes[0]))
}
};
if let Some(prefixes) = current_prefixes {
buffer.clear();
buffer.extend_from_slice(w1.as_bytes());
buffer.push(0);
for prefix in prefixes.iter() {
if prefix.len() <= self.max_prefix_length && w2.starts_with(prefix) {
buffer.truncate(w1.len() + 1);
buffer.extend_from_slice(prefix.as_bytes());
buffer.push(prox);
match prefixes_cache.get_mut(&buffer) {
Some(value) => value.push(data),
None => {
prefixes_cache.insert(buffer.clone(), vec![data]);
}
}
}
}
}
insert_current_prefix_data_in_sorter(
&mut buffer,
&mut current_prefixes,
&mut prefixes_cache,
&mut word_prefix_pair_proximity_docids_sorter,
common_prefix_fst_words,
self.max_prefix_length,
w1,
w2,
prox,
data,
)?;
}
write_prefixes_in_sorter(
@ -128,16 +120,63 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> {
&mut word_prefix_pair_proximity_docids_sorter,
)?;
drop(prefix_fst);
drop(db);
// We compute the prefix docids associated with the newly added prefixes
// in the new word prefix fst.
let mut db_iter =
self.index.word_pair_proximity_docids.remap_data_type::<ByteSlice>().iter(self.wtxn)?;
// We finally write the word prefix pair proximity docids into the LMDB database.
let mut buffer = Vec::new();
let mut current_prefixes: Option<&&[String]> = None;
let mut prefixes_cache = HashMap::new();
while let Some(((w1, w2, prox), data)) = db_iter.next().transpose()? {
if prox > self.max_proximity {
continue;
}
insert_current_prefix_data_in_sorter(
&mut buffer,
&mut current_prefixes,
&mut prefixes_cache,
&mut word_prefix_pair_proximity_docids_sorter,
&new_prefix_fst_words,
self.max_prefix_length,
w1,
w2,
prox,
data,
)?;
}
write_prefixes_in_sorter(
&mut prefixes_cache,
&mut word_prefix_pair_proximity_docids_sorter,
)?;
drop(db_iter);
// All of the word prefix pairs in the database that have a w2
// that is contained in the `suppr_pw` set must be removed as well.
let mut iter = self
.index
.word_prefix_pair_proximity_docids
.remap_data_type::<ByteSlice>()
.iter_mut(self.wtxn)?;
while let Some(((_, w2, _), _)) = iter.next().transpose()? {
if del_prefix_fst_words.contains(w2.as_bytes()) {
// Delete this entry as the w2 prefix is no more in the words prefix fst.
unsafe { iter.del_current()? };
}
}
drop(iter);
// We finally write and merge the new word prefix pair proximity docids
// in the LMDB database.
sorter_into_lmdb_database(
self.wtxn,
*self.index.word_prefix_pair_proximity_docids.as_polymorph(),
word_prefix_pair_proximity_docids_sorter,
merge_cbo_roaring_bitmaps,
WriteMethod::Append,
)?;
Ok(())
@ -145,7 +184,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> {
}
fn write_prefixes_in_sorter(
prefixes: &mut HashMap<Vec<u8>, Vec<&[u8]>>,
prefixes: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>,
sorter: &mut grenad::Sorter<MergeFn>,
) -> Result<()> {
for (key, data_slices) in prefixes.drain() {
@ -156,3 +195,51 @@ fn write_prefixes_in_sorter(
Ok(())
}
/// Computes the current prefix based on the previous and the currently iterated value
/// i.e. w1, w2, prox. It also makes sure to follow the `max_prefix_length` setting.
///
/// Uses the current prefixes values to insert the associated data i.e. RoaringBitmap,
/// into the sorter that will, later, be inserted in the LMDB database.
fn insert_current_prefix_data_in_sorter<'a>(
buffer: &mut Vec<u8>,
current_prefixes: &mut Option<&'a &'a [String]>,
prefixes_cache: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>,
word_prefix_pair_proximity_docids_sorter: &mut grenad::Sorter<MergeFn>,
prefix_fst_keys: &'a [&'a [std::string::String]],
max_prefix_length: usize,
w1: &str,
w2: &str,
prox: u8,
data: &[u8],
) -> Result<()> {
*current_prefixes = match current_prefixes.take() {
Some(prefixes) if w2.starts_with(&prefixes[0]) => Some(prefixes),
_otherwise => {
write_prefixes_in_sorter(prefixes_cache, word_prefix_pair_proximity_docids_sorter)?;
prefix_fst_keys.iter().find(|prefixes| w2.starts_with(&prefixes[0]))
}
};
if let Some(prefixes) = current_prefixes {
buffer.clear();
buffer.extend_from_slice(w1.as_bytes());
buffer.push(0);
for prefix in prefixes.iter() {
if prefix.len() <= max_prefix_length && w2.starts_with(prefix) {
buffer.truncate(w1.len() + 1);
buffer.extend_from_slice(prefix.as_bytes());
buffer.push(prox);
match prefixes_cache.get_mut(buffer.as_slice()) {
Some(value) => value.push(data.to_owned()),
None => {
prefixes_cache.insert(buffer.clone(), vec![data.to_owned()]);
}
}
}
}
}
Ok(())
}

View File

@ -1,17 +1,18 @@
use std::collections::{HashMap, HashSet};
use std::num::NonZeroU32;
use std::{cmp, str};
use fst::Streamer;
use grenad::CompressionType;
use grenad::{CompressionType, MergerBuilder};
use heed::types::ByteSlice;
use heed::BytesEncode;
use heed::{BytesDecode, BytesEncode};
use log::debug;
use crate::error::SerializationError;
use crate::heed_codec::StrBEU32Codec;
use crate::index::main_key::WORDS_PREFIXES_FST_KEY;
use crate::update::index_documents::{
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, WriteMethod,
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, CursorClonableMmap,
MergeFn,
};
use crate::{Index, Result};
@ -54,12 +55,16 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> {
}
#[logging_timer::time("WordPrefixPositionDocids::{}")]
pub fn execute(self) -> Result<()> {
pub fn execute(
self,
new_word_position_docids: Vec<grenad::Reader<CursorClonableMmap>>,
new_prefix_fst_words: &[String],
common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> {
debug!("Computing and writing the word levels positions docids into LMDB on disk...");
self.index.word_prefix_position_docids.clear(self.wtxn)?;
let mut word_prefix_positions_docids_sorter = create_sorter(
let mut prefix_position_docids_sorter = create_sorter(
merge_cbo_roaring_bitmaps,
self.chunk_compression_type,
self.chunk_compression_level,
@ -67,39 +72,105 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> {
self.max_memory,
);
// We insert the word prefix position and
// corresponds to the word-prefix position where the prefixes appears
// in the prefix FST previously constructed.
let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?;
let mut word_position_docids_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps);
for reader in new_word_position_docids {
word_position_docids_merger.push(reader.into_cursor()?);
}
let mut word_position_docids_iter =
word_position_docids_merger.build().into_stream_merger_iter()?;
// We fetch all the new common prefixes between the previous and new prefix fst.
let mut buffer = Vec::new();
let mut current_prefixes: Option<&&[String]> = None;
let mut prefixes_cache = HashMap::new();
while let Some((key, data)) = word_position_docids_iter.next()? {
let (word, pos) = StrBEU32Codec::bytes_decode(key).ok_or(heed::Error::Decoding)?;
current_prefixes = match current_prefixes.take() {
Some(prefixes) if word.starts_with(&prefixes[0]) => Some(prefixes),
_otherwise => {
write_prefixes_in_sorter(
&mut prefixes_cache,
&mut prefix_position_docids_sorter,
)?;
common_prefix_fst_words.iter().find(|prefixes| word.starts_with(&prefixes[0]))
}
};
if let Some(prefixes) = current_prefixes {
for prefix in prefixes.iter() {
if word.starts_with(prefix) {
buffer.clear();
buffer.extend_from_slice(prefix.as_bytes());
buffer.extend_from_slice(&pos.to_be_bytes());
match prefixes_cache.get_mut(&buffer) {
Some(value) => value.push(data.to_owned()),
None => {
prefixes_cache.insert(buffer.clone(), vec![data.to_owned()]);
}
}
}
}
}
}
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_position_docids_sorter)?;
// We fetch the docids associated to the newly added word prefix fst only.
let db = self.index.word_position_docids.remap_data_type::<ByteSlice>();
// iter over all prefixes in the prefix fst.
let mut word_stream = prefix_fst.stream();
while let Some(prefix_bytes) = word_stream.next() {
let prefix = str::from_utf8(prefix_bytes).map_err(|_| {
for prefix_bytes in new_prefix_fst_words {
let prefix = str::from_utf8(prefix_bytes.as_bytes()).map_err(|_| {
SerializationError::Decoding { db_name: Some(WORDS_PREFIXES_FST_KEY) }
})?;
// iter over all lines of the DB where the key is prefixed by the current prefix.
let mut iter = db
let iter = db
.remap_key_type::<ByteSlice>()
.prefix_iter(self.wtxn, &prefix_bytes)?
.prefix_iter(self.wtxn, prefix_bytes.as_bytes())?
.remap_key_type::<StrBEU32Codec>();
while let Some(((_word, pos), data)) = iter.next().transpose()? {
let key = (prefix, pos);
let bytes = StrBEU32Codec::bytes_encode(&key).unwrap();
word_prefix_positions_docids_sorter.insert(bytes, data)?;
for result in iter {
let ((word, pos), data) = result?;
if word.starts_with(prefix) {
let key = (prefix, pos);
let bytes = StrBEU32Codec::bytes_encode(&key).unwrap();
prefix_position_docids_sorter.insert(bytes, data)?;
}
}
}
// We remove all the entries that are no more required in this word prefix position
// docids database.
let mut iter =
self.index.word_prefix_position_docids.iter_mut(self.wtxn)?.lazily_decode_data();
while let Some(((prefix, _), _)) = iter.next().transpose()? {
if del_prefix_fst_words.contains(prefix.as_bytes()) {
unsafe { iter.del_current()? };
}
}
drop(iter);
// We finally write all the word prefix position docids into the LMDB database.
sorter_into_lmdb_database(
self.wtxn,
*self.index.word_prefix_position_docids.as_polymorph(),
word_prefix_positions_docids_sorter,
prefix_position_docids_sorter,
merge_cbo_roaring_bitmaps,
WriteMethod::Append,
)?;
Ok(())
}
}
fn write_prefixes_in_sorter(
prefixes: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>,
sorter: &mut grenad::Sorter<MergeFn>,
) -> Result<()> {
for (key, data_slices) in prefixes.drain() {
for data in data_slices {
sorter.insert(&key, data)?;
}
}
Ok(())
}