Avoid using too much memory when indexing facet-exists-docids

This commit is contained in:
Loïc Lecrenier 2022-07-19 14:42:35 +02:00
parent d0eee5ff7a
commit 1506683705
3 changed files with 40 additions and 76 deletions

View File

@ -5,13 +5,15 @@ use std::io;
use std::mem::size_of; use std::mem::size_of;
use heed::zerocopy::AsBytes; use heed::zerocopy::AsBytes;
use heed::BytesEncode;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde_json::Value; use serde_json::Value;
use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters}; use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters};
use crate::error::InternalError; use crate::error::InternalError;
use crate::facet::value_encoding::f64_into_bytes; use crate::facet::value_encoding::f64_into_bytes;
use crate::{DocumentId, FieldId, Result, BEU32}; use crate::update::index_documents::{create_writer, writer_into_reader};
use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, BEU32};
/// Extracts the facet values of each faceted field of each document. /// Extracts the facet values of each faceted field of each document.
/// ///
@ -22,7 +24,7 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
obkv_documents: grenad::Reader<R>, obkv_documents: grenad::Reader<R>,
indexer: GrenadParameters, indexer: GrenadParameters,
faceted_fields: &HashSet<FieldId>, faceted_fields: &HashSet<FieldId>,
) -> Result<(grenad::Reader<File>, grenad::Reader<File>, BTreeMap<FieldId, RoaringBitmap>)> { ) -> Result<(grenad::Reader<File>, grenad::Reader<File>, grenad::Reader<File>)> {
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let mut fid_docid_facet_numbers_sorter = create_sorter( let mut fid_docid_facet_numbers_sorter = create_sorter(
@ -91,10 +93,21 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
} }
} }
let mut facet_exists_docids_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
for (fid, bitmap) in facet_exists_docids.into_iter() {
let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap();
facet_exists_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?;
}
let facet_exists_docids_reader = writer_into_reader(facet_exists_docids_writer)?;
Ok(( Ok((
sorter_into_reader(fid_docid_facet_numbers_sorter, indexer.clone())?, sorter_into_reader(fid_docid_facet_numbers_sorter, indexer.clone())?,
sorter_into_reader(fid_docid_facet_strings_sorter, indexer.clone())?, sorter_into_reader(fid_docid_facet_strings_sorter, indexer.clone())?,
facet_exists_docids, facet_exists_docids_reader,
)) ))
} }

View File

@ -8,13 +8,12 @@ mod extract_word_docids;
mod extract_word_pair_proximity_docids; mod extract_word_pair_proximity_docids;
mod extract_word_position_docids; mod extract_word_position_docids;
use std::collections::{BTreeMap, HashSet}; use std::collections::HashSet;
use std::fs::File; use std::fs::File;
use crossbeam_channel::Sender; use crossbeam_channel::Sender;
use log::debug; use log::debug;
use rayon::prelude::*; use rayon::prelude::*;
use roaring::RoaringBitmap;
use self::extract_docid_word_positions::extract_docid_word_positions; use self::extract_docid_word_positions::extract_docid_word_positions;
use self::extract_facet_number_docids::extract_facet_number_docids; use self::extract_facet_number_docids::extract_facet_number_docids;
@ -73,21 +72,25 @@ pub(crate) fn data_from_obkv_documents(
let ( let (
docid_word_positions_chunks, docid_word_positions_chunks,
(docid_fid_facet_numbers_chunks, (docid_fid_facet_strings_chunks, facet_exists_docids)), (
docid_fid_facet_numbers_chunks,
(docid_fid_facet_strings_chunks, facet_exists_docids_chunks),
),
) = result?; ) = result?;
// merge facet_exists_docids hashmaps and send them as a typed chunk // merge facet_exists_docids and send them as a typed chunk
{ {
let lmdb_writer_sx = lmdb_writer_sx.clone(); let lmdb_writer_sx = lmdb_writer_sx.clone();
rayon::spawn(move || { rayon::spawn(move || {
let mut all = BTreeMap::default(); debug!("merge {} database", "facet-id-exists-docids");
for facet_exists_docids in facet_exists_docids { match facet_exists_docids_chunks.merge(merge_cbo_roaring_bitmaps, &indexer) {
for (field_id, docids) in facet_exists_docids { Ok(reader) => {
let docids0 = all.entry(field_id).or_default(); let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetExistsDocids(reader)));
*docids0 |= docids; }
Err(e) => {
let _ = lmdb_writer_sx.send(Err(e));
} }
} }
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetExistsDocids(all)));
}); });
} }
@ -228,7 +231,7 @@ fn send_and_extract_flattened_documents_data(
grenad::Reader<CursorClonableMmap>, grenad::Reader<CursorClonableMmap>,
( (
grenad::Reader<CursorClonableMmap>, grenad::Reader<CursorClonableMmap>,
(grenad::Reader<CursorClonableMmap>, BTreeMap<FieldId, RoaringBitmap>), (grenad::Reader<CursorClonableMmap>, grenad::Reader<File>),
), ),
)> { )> {
let flattened_documents_chunk = let flattened_documents_chunk =
@ -273,7 +276,7 @@ fn send_and_extract_flattened_documents_data(
let ( let (
docid_fid_facet_numbers_chunk, docid_fid_facet_numbers_chunk,
docid_fid_facet_strings_chunk, docid_fid_facet_strings_chunk,
facet_exists_docids, fid_facet_exists_docids_chunk,
) = extract_fid_docid_facet_values( ) = extract_fid_docid_facet_values(
flattened_documents_chunk.clone(), flattened_documents_chunk.clone(),
indexer.clone(), indexer.clone(),
@ -298,7 +301,7 @@ fn send_and_extract_flattened_documents_data(
Ok(( Ok((
docid_fid_facet_numbers_chunk, docid_fid_facet_numbers_chunk,
(docid_fid_facet_strings_chunk, facet_exists_docids), (docid_fid_facet_strings_chunk, fid_facet_exists_docids_chunk),
)) ))
}, },
); );

View File

@ -1,12 +1,11 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::collections::BTreeMap;
use std::convert::TryInto; use std::convert::TryInto;
use std::fs::File; use std::fs::File;
use std::io; use std::io;
use grenad::MergerBuilder; use grenad::MergerBuilder;
use heed::types::ByteSlice; use heed::types::ByteSlice;
use heed::{BytesDecode, BytesEncode, RwTxn}; use heed::{BytesDecode, RwTxn};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use super::helpers::{ use super::helpers::{
@ -17,8 +16,8 @@ use super::{ClonableMmap, MergeFn};
use crate::heed_codec::facet::{decode_prefix_string, encode_prefix_string}; use crate::heed_codec::facet::{decode_prefix_string, encode_prefix_string};
use crate::update::index_documents::helpers::as_cloneable_grenad; use crate::update::index_documents::helpers::as_cloneable_grenad;
use crate::{ use crate::{
error, lat_lng_to_xyz, BoRoaringBitmapCodec, CboRoaringBitmapCodec, DocumentId, FieldId, lat_lng_to_xyz, BoRoaringBitmapCodec, CboRoaringBitmapCodec, DocumentId, GeoPoint, Index,
GeoPoint, Index, Result, BEU16, Result,
}; };
pub(crate) enum TypedChunk { pub(crate) enum TypedChunk {
@ -36,7 +35,7 @@ pub(crate) enum TypedChunk {
WordPairProximityDocids(grenad::Reader<File>), WordPairProximityDocids(grenad::Reader<File>),
FieldIdFacetStringDocids(grenad::Reader<File>), FieldIdFacetStringDocids(grenad::Reader<File>),
FieldIdFacetNumberDocids(grenad::Reader<File>), FieldIdFacetNumberDocids(grenad::Reader<File>),
FieldIdFacetExistsDocids(BTreeMap<FieldId, RoaringBitmap>), FieldIdFacetExistsDocids(grenad::Reader<File>),
GeoPoints(grenad::Reader<File>), GeoPoints(grenad::Reader<File>),
} }
@ -149,11 +148,12 @@ pub(crate) fn write_typed_chunk_into_index(
is_merged_database = true; is_merged_database = true;
} }
TypedChunk::FieldIdFacetExistsDocids(facet_id_exists_docids) => { TypedChunk::FieldIdFacetExistsDocids(facet_id_exists_docids) => {
write_sorted_iterator_into_database( append_entries_into_database(
facet_id_exists_docids.into_iter().map(|(k, v)| (BEU16::new(k), v)), facet_id_exists_docids,
&index.facet_id_exists_docids, &index.facet_id_exists_docids,
"facet-id-exists-docids",
wtxn, wtxn,
index_is_empty,
|value, _buffer| Ok(value),
merge_cbo_roaring_bitmaps, merge_cbo_roaring_bitmaps,
)?; )?;
is_merged_database = true; is_merged_database = true;
@ -269,58 +269,6 @@ fn merge_cbo_roaring_bitmaps(
)?) )?)
} }
fn write_sorted_iterator_into_database<Iter, Key, Value, KeyCodec, ValueCodec, Merge>(
mut iterator: Iter,
database: &heed::Database<KeyCodec, ValueCodec>,
database_name: &'static str,
wtxn: &mut RwTxn,
merge_values: Merge,
) -> Result<()>
where
for<'a> KeyCodec: BytesEncode<'a, EItem = Key>,
for<'a> ValueCodec: BytesEncode<'a, EItem = Value> + BytesDecode<'a, DItem = Value>,
Iter: Iterator<Item = (Key, Value)>,
Merge: Fn(&[u8], &[u8], &mut Vec<u8>) -> Result<()>,
{
if database.is_empty(wtxn)? {
let mut database = database.iter_mut(wtxn)?.remap_types::<ByteSlice, ByteSlice>();
while let Some((key, value)) = iterator.next() {
let key = KeyCodec::bytes_encode(&key)
.ok_or(error::SerializationError::Encoding { db_name: Some(database_name) })?;
if valid_lmdb_key(&key) {
let value = ValueCodec::bytes_encode(&value)
.ok_or(error::SerializationError::Encoding { db_name: Some(database_name) })?;
unsafe { database.append(&key, &value)? };
}
}
Ok(())
} else {
let database = database.remap_types::<ByteSlice, ByteSlice>();
let mut buffer = Vec::new();
while let Some((key, value)) = iterator.next() {
let key = KeyCodec::bytes_encode(&key)
.ok_or(error::SerializationError::Encoding { db_name: Some(database_name) })?;
if valid_lmdb_key(&key) {
let value = ValueCodec::bytes_encode(&value)
.ok_or(error::SerializationError::Encoding { db_name: Some(database_name) })?;
let value = match database.get(wtxn, &key)? {
Some(prev_value) => {
merge_values(&value, &prev_value, &mut buffer)?;
&buffer[..]
}
None => &value,
};
database.put(wtxn, &key, value)?;
}
}
Ok(())
}
}
/// Write provided entries in database using serialize_value function. /// Write provided entries in database using serialize_value function.
/// merge_values function is used if an entry already exist in the database. /// merge_values function is used if an entry already exist in the database.
fn write_entries_into_database<R, K, V, FS, FM>( fn write_entries_into_database<R, K, V, FS, FM>(