From 85824ee203a3f6c99a0335c9a11c275cb6dc37f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Lecrenier?= Date: Thu, 1 Sep 2022 08:17:27 +0200 Subject: [PATCH] Try to make facet indexing incremental --- milli/src/search/facet/facet_range_search.rs | 2 +- milli/src/update/delete_documents.rs | 7 +++- milli/src/update/facet/bulk.rs | 18 ++++++-- .../extract/extract_facet_string_docids.rs | 4 ++ milli/src/update/index_documents/mod.rs | 18 -------- .../src/update/index_documents/typed_chunk.rs | 41 ++++++++++++++----- 6 files changed, 55 insertions(+), 35 deletions(-) diff --git a/milli/src/search/facet/facet_range_search.rs b/milli/src/search/facet/facet_range_search.rs index 7e7c5e713..523b3853c 100644 --- a/milli/src/search/facet/facet_range_search.rs +++ b/milli/src/search/facet/facet_range_search.rs @@ -138,7 +138,7 @@ impl<'t, 'b, 'bitmap> FacetRangeSearch<'t, 'b, 'bitmap> { let should_skip = { match self.left { Bound::Included(left) => left >= next_key.left_bound, - Bound::Excluded(left) => left >= next_key.left_bound, // TODO: use > instead? + Bound::Excluded(left) => left >= next_key.left_bound, Bound::Unbounded => false, } }; diff --git a/milli/src/update/delete_documents.rs b/milli/src/update/delete_documents.rs index 1d1745d82..bb18ed80f 100644 --- a/milli/src/update/delete_documents.rs +++ b/milli/src/update/delete_documents.rs @@ -2,7 +2,7 @@ use std::collections::btree_map::Entry; use fst::IntoStreamer; use heed::types::{ByteSlice, Str}; -use heed::Database; +use heed::{Database, RwTxn}; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -446,6 +446,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { facet_id_f64_docids.remap_key_type::>(), &self.to_delete_docids, fields_ids_map.clone(), + Index::put_number_faceted_documents_ids, )?; remove_docids_from_facet_id_docids( self.wtxn, @@ -453,6 +454,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { facet_id_string_docids.remap_key_type::>(), &self.to_delete_docids, fields_ids_map.clone(), + Index::put_string_faceted_documents_ids, )?; // We delete the documents ids that are under the facet field id values. remove_docids_from_facet_id_exists_docids( @@ -614,6 +616,7 @@ fn remove_docids_from_facet_id_docids<'a>( db: heed::Database, FacetGroupValueCodec>, to_remove: &RoaringBitmap, fields_ids_map: FieldsIdsMap, + put_faceted_docids_in_main: fn(&Index, &mut RwTxn, FieldId, &RoaringBitmap) -> heed::Result<()>, ) -> Result<()> { let mut modified = false; for field_id in fields_ids_map.ids() { @@ -643,7 +646,7 @@ fn remove_docids_from_facet_id_docids<'a>( if !modified { return Ok(()); } - let builder = FacetsUpdateBulk::new(index, db); + let builder = FacetsUpdateBulk::new(index, db, put_faceted_docids_in_main); builder.execute(wtxn)?; Ok(()) diff --git a/milli/src/update/facet/bulk.rs b/milli/src/update/facet/bulk.rs index 587dc95ab..b3e932dc2 100644 --- a/milli/src/update/facet/bulk.rs +++ b/milli/src/update/facet/bulk.rs @@ -6,7 +6,7 @@ use crate::update::index_documents::{create_writer, write_into_lmdb_database, wr use crate::{FieldId, Index, Result}; use grenad::CompressionType; use heed::types::ByteSlice; -use heed::{BytesEncode, Error, RoTxn}; +use heed::{BytesEncode, Error, RoTxn, RwTxn}; use log::debug; use roaring::RoaringBitmap; use std::cmp; @@ -21,12 +21,19 @@ pub struct FacetsUpdateBulk<'i> { pub(crate) chunk_compression_level: Option, level_group_size: usize, min_level_size: usize, + put_faceted_docids_in_main: fn(&Index, &mut RwTxn, FieldId, &RoaringBitmap) -> heed::Result<()>, } impl<'i> FacetsUpdateBulk<'i> { pub fn new( index: &'i Index, database: heed::Database, FacetGroupValueCodec>, + put_faceted_docids_in_main: fn( + &Index, + &mut RwTxn, + FieldId, + &RoaringBitmap, + ) -> heed::Result<()>, ) -> FacetsUpdateBulk<'i> { FacetsUpdateBulk { index, @@ -35,6 +42,7 @@ impl<'i> FacetsUpdateBulk<'i> { chunk_compression_level: None, level_group_size: 4, min_level_size: 5, + put_faceted_docids_in_main, } } @@ -78,8 +86,12 @@ impl<'i> FacetsUpdateBulk<'i> { let (level_readers, all_docids) = self.compute_levels_for_field_id(field_id, &nested_wtxn)?; - // TODO: this will need to be an argument to Facets as well - self.index.put_string_faceted_documents_ids(&mut nested_wtxn, field_id, &all_docids)?; + (self.put_faceted_docids_in_main)( + &self.index, + &mut nested_wtxn, + field_id, + &all_docids, + )?; for level_reader in level_readers { // TODO: append instead of write with merge diff --git a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs index 51d2df923..0bb83c29a 100644 --- a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs @@ -32,6 +32,10 @@ pub fn extract_facet_string_docids( let (field_id_bytes, bytes) = try_split_array_at(key).unwrap(); let field_id = FieldId::from_be_bytes(field_id_bytes); + // document_id_bytes is a big-endian u32 + // merge_cbo_roaring_bitmap works with native endian u32s + // that is a problem, I think + let (document_id_bytes, normalized_value_bytes) = try_split_array_at::<_, 4>(bytes).unwrap(); diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index be9b1e3c5..1ab1bd38d 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -34,7 +34,6 @@ use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; pub use self::transform::{Transform, TransformOutput}; use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::error::UserError; -use crate::heed_codec::facet::new::{FacetKeyCodec, MyByteSlice}; pub use crate::update::index_documents::helpers::CursorClonableMmap; use crate::update::{ self, FacetsUpdateBulk, IndexerConfig, UpdateIndexingStep, WordPrefixDocids, @@ -431,23 +430,6 @@ where // Merged databases are already been indexed, we start from this count; let mut databases_seen = MERGED_DATABASE_COUNT; - // Run the facets update operation. - for facet_db in [ - (&self.index.facet_id_string_docids).remap_key_type::>(), - (&self.index.facet_id_f64_docids).remap_key_type::>(), - ] { - let mut builder = FacetsUpdateBulk::new(self.index, facet_db); - builder.chunk_compression_type = self.indexer_config.chunk_compression_type; - builder.chunk_compression_level = self.indexer_config.chunk_compression_level; - if let Some(value) = self.config.facet_level_group_size { - builder.level_group_size(value); - } - if let Some(value) = self.config.facet_min_level_size { - builder.min_level_size(value); - } - builder.execute(self.wtxn)?; - } - databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { databases_seen, diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 3c7a78d95..7aa306183 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -13,7 +13,9 @@ use super::helpers::{ valid_lmdb_key, CursorClonableMmap, }; use super::{ClonableMmap, MergeFn}; +use crate::heed_codec::facet::new::{FacetKeyCodec, MyByteSlice}; use crate::update::index_documents::helpers::as_cloneable_grenad; +use crate::update::FacetsUpdateIncremental; use crate::{ lat_lng_to_xyz, BoRoaringBitmapCodec, CboRoaringBitmapCodec, DocumentId, GeoPoint, Index, Result, @@ -146,6 +148,34 @@ pub(crate) fn write_typed_chunk_into_index( )?; is_merged_database = true; } + TypedChunk::FieldIdFacetStringDocids(facet_id_string_docids) => { + // merge cbo roaring bitmaps is not the correct merger because the data in the DB + // is FacetGroupValue and not RoaringBitmap + // so I need to create my own merging function + + // facet_id_string_docids is encoded as: + // key: FacetKeyCodec + // value: CboRoaringBitmapCodec + // basically + + // TODO: a condition saying "if I have more than 1/50th of the DB to add, + // then I do it in bulk, otherwise I do it incrementally". But instead of 1/50, + // it is a ratio I determine empirically + + // for now I only do it incrementally, to see if things work + let builder = FacetsUpdateIncremental::new( + index.facet_id_string_docids.remap_key_type::>(), + ); + let mut cursor = facet_id_string_docids.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { + let key = + FacetKeyCodec::::bytes_decode(key).ok_or(heed::Error::Encoding)?; + let value = + CboRoaringBitmapCodec::bytes_decode(value).ok_or(heed::Error::Encoding)?; + builder.insert(wtxn, key.field_id, key.left_bound, &value)?; + } + is_merged_database = true; + } TypedChunk::FieldIdFacetExistsDocids(facet_id_exists_docids) => { append_entries_into_database( facet_id_exists_docids, @@ -188,17 +218,6 @@ pub(crate) fn write_typed_chunk_into_index( } } } - TypedChunk::FieldIdFacetStringDocids(facet_id_string_docids) => { - // facet_id_string_docids contains the thing that the extractor put into it, - // so: (FacetKey { field id, level: 0, left_bound } , docids: RoaringBitmap ) - // now we need to either: - // 1. incrementally add the keys/docids pairs into the DB - // 2. add the keys/docids into level 0 and then call Facets::execute - // the choice of solution should be determined by their performance - // characteristics - - is_merged_database = true; - } TypedChunk::GeoPoints(geo_points) => { let mut rtree = index.geo_rtree(wtxn)?.unwrap_or_default(); let mut geo_faceted_docids = index.geo_faceted_documents_ids(wtxn)?;