Try to make facet indexing incremental

This commit is contained in:
Loïc Lecrenier 2022-09-01 08:17:27 +02:00 committed by Loïc Lecrenier
parent d30c89e345
commit 85824ee203
6 changed files with 55 additions and 35 deletions

View File

@ -138,7 +138,7 @@ impl<'t, 'b, 'bitmap> FacetRangeSearch<'t, 'b, 'bitmap> {
let should_skip = {
match self.left {
Bound::Included(left) => left >= next_key.left_bound,
Bound::Excluded(left) => left >= next_key.left_bound, // TODO: use > instead?
Bound::Excluded(left) => left >= next_key.left_bound,
Bound::Unbounded => false,
}
};

View File

@ -2,7 +2,7 @@ use std::collections::btree_map::Entry;
use fst::IntoStreamer;
use heed::types::{ByteSlice, Str};
use heed::Database;
use heed::{Database, RwTxn};
use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
@ -446,6 +446,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
facet_id_f64_docids.remap_key_type::<FacetKeyCodec<MyByteSlice>>(),
&self.to_delete_docids,
fields_ids_map.clone(),
Index::put_number_faceted_documents_ids,
)?;
remove_docids_from_facet_id_docids(
self.wtxn,
@ -453,6 +454,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
facet_id_string_docids.remap_key_type::<FacetKeyCodec<MyByteSlice>>(),
&self.to_delete_docids,
fields_ids_map.clone(),
Index::put_string_faceted_documents_ids,
)?;
// We delete the documents ids that are under the facet field id values.
remove_docids_from_facet_id_exists_docids(
@ -614,6 +616,7 @@ fn remove_docids_from_facet_id_docids<'a>(
db: heed::Database<FacetKeyCodec<MyByteSlice>, FacetGroupValueCodec>,
to_remove: &RoaringBitmap,
fields_ids_map: FieldsIdsMap,
put_faceted_docids_in_main: fn(&Index, &mut RwTxn, FieldId, &RoaringBitmap) -> heed::Result<()>,
) -> Result<()> {
let mut modified = false;
for field_id in fields_ids_map.ids() {
@ -643,7 +646,7 @@ fn remove_docids_from_facet_id_docids<'a>(
if !modified {
return Ok(());
}
let builder = FacetsUpdateBulk::new(index, db);
let builder = FacetsUpdateBulk::new(index, db, put_faceted_docids_in_main);
builder.execute(wtxn)?;
Ok(())

View File

@ -6,7 +6,7 @@ use crate::update::index_documents::{create_writer, write_into_lmdb_database, wr
use crate::{FieldId, Index, Result};
use grenad::CompressionType;
use heed::types::ByteSlice;
use heed::{BytesEncode, Error, RoTxn};
use heed::{BytesEncode, Error, RoTxn, RwTxn};
use log::debug;
use roaring::RoaringBitmap;
use std::cmp;
@ -21,12 +21,19 @@ pub struct FacetsUpdateBulk<'i> {
pub(crate) chunk_compression_level: Option<u32>,
level_group_size: usize,
min_level_size: usize,
put_faceted_docids_in_main: fn(&Index, &mut RwTxn, FieldId, &RoaringBitmap) -> heed::Result<()>,
}
impl<'i> FacetsUpdateBulk<'i> {
pub fn new(
index: &'i Index,
database: heed::Database<FacetKeyCodec<MyByteSlice>, FacetGroupValueCodec>,
put_faceted_docids_in_main: fn(
&Index,
&mut RwTxn,
FieldId,
&RoaringBitmap,
) -> heed::Result<()>,
) -> FacetsUpdateBulk<'i> {
FacetsUpdateBulk {
index,
@ -35,6 +42,7 @@ impl<'i> FacetsUpdateBulk<'i> {
chunk_compression_level: None,
level_group_size: 4,
min_level_size: 5,
put_faceted_docids_in_main,
}
}
@ -78,8 +86,12 @@ impl<'i> FacetsUpdateBulk<'i> {
let (level_readers, all_docids) =
self.compute_levels_for_field_id(field_id, &nested_wtxn)?;
// TODO: this will need to be an argument to Facets as well
self.index.put_string_faceted_documents_ids(&mut nested_wtxn, field_id, &all_docids)?;
(self.put_faceted_docids_in_main)(
&self.index,
&mut nested_wtxn,
field_id,
&all_docids,
)?;
for level_reader in level_readers {
// TODO: append instead of write with merge

View File

@ -32,6 +32,10 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
let (field_id_bytes, bytes) = try_split_array_at(key).unwrap();
let field_id = FieldId::from_be_bytes(field_id_bytes);
// document_id_bytes is a big-endian u32
// merge_cbo_roaring_bitmap works with native endian u32s
// that is a problem, I think
let (document_id_bytes, normalized_value_bytes) =
try_split_array_at::<_, 4>(bytes).unwrap();

View File

@ -34,7 +34,6 @@ use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput};
use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::UserError;
use crate::heed_codec::facet::new::{FacetKeyCodec, MyByteSlice};
pub use crate::update::index_documents::helpers::CursorClonableMmap;
use crate::update::{
self, FacetsUpdateBulk, IndexerConfig, UpdateIndexingStep, WordPrefixDocids,
@ -431,23 +430,6 @@ where
// Merged databases are already been indexed, we start from this count;
let mut databases_seen = MERGED_DATABASE_COUNT;
// Run the facets update operation.
for facet_db in [
(&self.index.facet_id_string_docids).remap_key_type::<FacetKeyCodec<MyByteSlice>>(),
(&self.index.facet_id_f64_docids).remap_key_type::<FacetKeyCodec<MyByteSlice>>(),
] {
let mut builder = FacetsUpdateBulk::new(self.index, facet_db);
builder.chunk_compression_type = self.indexer_config.chunk_compression_type;
builder.chunk_compression_level = self.indexer_config.chunk_compression_level;
if let Some(value) = self.config.facet_level_group_size {
builder.level_group_size(value);
}
if let Some(value) = self.config.facet_min_level_size {
builder.min_level_size(value);
}
builder.execute(self.wtxn)?;
}
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,

View File

@ -13,7 +13,9 @@ use super::helpers::{
valid_lmdb_key, CursorClonableMmap,
};
use super::{ClonableMmap, MergeFn};
use crate::heed_codec::facet::new::{FacetKeyCodec, MyByteSlice};
use crate::update::index_documents::helpers::as_cloneable_grenad;
use crate::update::FacetsUpdateIncremental;
use crate::{
lat_lng_to_xyz, BoRoaringBitmapCodec, CboRoaringBitmapCodec, DocumentId, GeoPoint, Index,
Result,
@ -146,6 +148,34 @@ pub(crate) fn write_typed_chunk_into_index(
)?;
is_merged_database = true;
}
TypedChunk::FieldIdFacetStringDocids(facet_id_string_docids) => {
// merge cbo roaring bitmaps is not the correct merger because the data in the DB
// is FacetGroupValue and not RoaringBitmap
// so I need to create my own merging function
// facet_id_string_docids is encoded as:
// key: FacetKeyCodec<StrRefCodec>
// value: CboRoaringBitmapCodec
// basically
// TODO: a condition saying "if I have more than 1/50th of the DB to add,
// then I do it in bulk, otherwise I do it incrementally". But instead of 1/50,
// it is a ratio I determine empirically
// for now I only do it incrementally, to see if things work
let builder = FacetsUpdateIncremental::new(
index.facet_id_string_docids.remap_key_type::<FacetKeyCodec<MyByteSlice>>(),
);
let mut cursor = facet_id_string_docids.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let key =
FacetKeyCodec::<MyByteSlice>::bytes_decode(key).ok_or(heed::Error::Encoding)?;
let value =
CboRoaringBitmapCodec::bytes_decode(value).ok_or(heed::Error::Encoding)?;
builder.insert(wtxn, key.field_id, key.left_bound, &value)?;
}
is_merged_database = true;
}
TypedChunk::FieldIdFacetExistsDocids(facet_id_exists_docids) => {
append_entries_into_database(
facet_id_exists_docids,
@ -188,17 +218,6 @@ pub(crate) fn write_typed_chunk_into_index(
}
}
}
TypedChunk::FieldIdFacetStringDocids(facet_id_string_docids) => {
// facet_id_string_docids contains the thing that the extractor put into it,
// so: (FacetKey { field id, level: 0, left_bound } , docids: RoaringBitmap )
// now we need to either:
// 1. incrementally add the keys/docids pairs into the DB
// 2. add the keys/docids into level 0 and then call Facets::execute
// the choice of solution should be determined by their performance
// characteristics
is_merged_database = true;
}
TypedChunk::GeoPoints(geo_points) => {
let mut rtree = index.geo_rtree(wtxn)?.unwrap_or_default();
let mut geo_faceted_docids = index.geo_faceted_documents_ids(wtxn)?;