diff --git a/milli/src/update/facet/bulk.rs b/milli/src/update/facet/bulk.rs index f93ee735e..0a4b7db45 100644 --- a/milli/src/update/facet/bulk.rs +++ b/milli/src/update/facet/bulk.rs @@ -1,10 +1,11 @@ +use std::borrow::Cow; use std::cmp; use std::fs::File; use std::num::NonZeroUsize; use grenad::CompressionType; -use heed::types::ByteSlice; -use heed::{BytesEncode, Error, RoTxn}; +use heed::types::{ByteSlice, DecodeIgnore}; +use heed::{BytesDecode, BytesEncode, Error, RoTxn, RwTxn}; use log::debug; use roaring::RoaringBitmap; use time::OffsetDateTime; @@ -14,21 +15,27 @@ use crate::facet::FacetType; use crate::heed_codec::facet::new::{ FacetGroupValue, FacetGroupValueCodec, FacetKey, FacetKeyCodec, MyByteSlice, }; -use crate::update::index_documents::{create_writer, write_into_lmdb_database, writer_into_reader}; -use crate::{FieldId, Index, Result}; +use crate::update::index_documents::{ + create_writer, valid_lmdb_key, write_into_lmdb_database, writer_into_reader, +}; +use crate::{CboRoaringBitmapCodec, FieldId, Index, Result}; pub struct FacetsUpdateBulk<'i> { index: &'i Index, database: heed::Database, FacetGroupValueCodec>, - pub(crate) chunk_compression_type: CompressionType, - pub(crate) chunk_compression_level: Option, level_group_size: usize, min_level_size: usize, facet_type: FacetType, + // None if level 0 does not need to be updated + new_data: Option>, } impl<'i> FacetsUpdateBulk<'i> { - pub fn new(index: &'i Index, facet_type: FacetType) -> FacetsUpdateBulk<'i> { + pub fn new( + index: &'i Index, + facet_type: FacetType, + new_data: grenad::Reader, + ) -> FacetsUpdateBulk<'i> { FacetsUpdateBulk { index, database: match facet_type { @@ -39,11 +46,31 @@ impl<'i> FacetsUpdateBulk<'i> { index.facet_id_f64_docids.remap_key_type::>() } }, - chunk_compression_type: CompressionType::None, - chunk_compression_level: None, level_group_size: 4, min_level_size: 5, facet_type, + new_data: Some(new_data), + } + } + + pub fn new_not_updating_level_0( + index: &'i Index, + facet_type: FacetType, + ) -> FacetsUpdateBulk<'i> { + FacetsUpdateBulk { + index, + database: match facet_type { + FacetType::String => { + index.facet_id_string_docids.remap_key_type::>() + } + FacetType::Number => { + index.facet_id_f64_docids.remap_key_type::>() + } + }, + level_group_size: 4, + min_level_size: 5, + facet_type, + new_data: None, } } @@ -70,39 +97,84 @@ impl<'i> FacetsUpdateBulk<'i> { } #[logging_timer::time("FacetsUpdateBulk::{}")] - pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> { + pub fn execute(mut self, wtxn: &mut heed::RwTxn) -> Result<()> { self.index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; + debug!("Computing and writing the facet values levels docids into LMDB on disk..."); + // We get the faceted fields to be able to create the facet levels. let faceted_fields = self.index.faceted_fields_ids(wtxn)?.clone(); - debug!("Computing and writing the facet values levels docids into LMDB on disk..."); - for &field_id in faceted_fields.iter() { self.clear_levels(wtxn, field_id)?; } + self.update_level0(wtxn)?; - let mut nested_wtxn = self.index.env.nested_write_txn(wtxn)?; + // let mut nested_wtxn = self.index.env.nested_write_txn(wtxn)?; for &field_id in faceted_fields.iter() { - let (level_readers, all_docids) = - self.compute_levels_for_field_id(field_id, &nested_wtxn)?; + let (level_readers, all_docids) = self.compute_levels_for_field_id(field_id, &wtxn)?; - let put_docids_fn = match self.facet_type { - FacetType::Number => Index::put_number_faceted_documents_ids, - FacetType::String => Index::put_string_faceted_documents_ids, - }; - put_docids_fn(&self.index, &mut nested_wtxn, field_id, &all_docids)?; + self.index.put_faceted_documents_ids(wtxn, field_id, self.facet_type, &all_docids)?; for level_reader in level_readers { - // TODO: append instead of write with merge - write_into_lmdb_database( - &mut nested_wtxn, - *self.database.as_polymorph(), - level_reader, - |_, _| { - Err(InternalError::IndexingMergingKeys { process: "facet string levels" })? - }, - )?; + let mut cursor = level_reader.into_cursor()?; + while let Some((k, v)) = cursor.move_on_next()? { + let key = FacetKeyCodec::::bytes_decode(k).unwrap(); + let value = FacetGroupValueCodec::bytes_decode(v).unwrap(); + println!("inserting {key:?} {value:?}"); + + self.database.remap_types::().put(wtxn, k, v)?; + } + } + } + + Ok(()) + } + + fn update_level0(&mut self, wtxn: &mut RwTxn) -> Result<()> { + let new_data = match self.new_data.take() { + Some(x) => x, + None => return Ok(()), + }; + if self.database.is_empty(wtxn)? { + let mut buffer = Vec::new(); + let mut database = self.database.iter_mut(wtxn)?.remap_types::(); + let mut cursor = new_data.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { + if valid_lmdb_key(key) { + buffer.clear(); + // the group size for level 0 + buffer.push(1); + // then we extend the buffer with the docids bitmap + buffer.extend_from_slice(value); + unsafe { database.append(key, &buffer)? }; + } + } + } else { + let mut buffer = Vec::new(); + let database = self.database.remap_types::(); + + let mut cursor = new_data.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { + if valid_lmdb_key(key) { + buffer.clear(); + // the group size for level 0 + buffer.push(1); + // then we extend the buffer with the docids bitmap + match database.get(wtxn, key)? { + Some(prev_value) => { + let old_bitmap = &prev_value[1..]; + CboRoaringBitmapCodec::merge_into( + &[Cow::Borrowed(value), Cow::Borrowed(old_bitmap)], + &mut buffer, + )?; + } + None => { + buffer.extend_from_slice(value); + } + }; + database.put(wtxn, key, &buffer)?; + } } } @@ -114,16 +186,14 @@ impl<'i> FacetsUpdateBulk<'i> { field_id: FieldId, txn: &RoTxn, ) -> Result<(Vec>, RoaringBitmap)> { - let algo = FacetsUpdateBulkAlgorithm { + // TODO: first check whether there is anything in level 0 + let algo = ComputeHigherLevels { rtxn: txn, db: &self.database, field_id, level_group_size: self.level_group_size, min_level_size: self.min_level_size, - chunk_compression_type: self.chunk_compression_type, - chunk_compression_level: self.chunk_compression_level, }; - // TODO: first check whether there is anything in level 0 let mut all_docids = RoaringBitmap::new(); let subwriters = algo.compute_higher_levels(32, &mut |bitmaps, _| { @@ -138,16 +208,14 @@ impl<'i> FacetsUpdateBulk<'i> { } } -pub struct FacetsUpdateBulkAlgorithm<'t> { +struct ComputeHigherLevels<'t> { rtxn: &'t heed::RoTxn<'t>, db: &'t heed::Database, FacetGroupValueCodec>, - chunk_compression_type: CompressionType, - chunk_compression_level: Option, field_id: u16, level_group_size: usize, min_level_size: usize, } -impl<'t> FacetsUpdateBulkAlgorithm<'t> { +impl<'t> ComputeHigherLevels<'t> { fn read_level_0( &self, handle_group: &mut dyn FnMut(&[RoaringBitmap], &'t [u8]) -> Result<()>, @@ -215,11 +283,7 @@ impl<'t> FacetsUpdateBulkAlgorithm<'t> { // once we have computed `level_group_size` elements, we give the left bound // of those elements, and their bitmaps, to the level above - let mut cur_writer = create_writer( - self.chunk_compression_type, - self.chunk_compression_level, - tempfile::tempfile()?, - ); + let mut cur_writer = create_writer(CompressionType::None, None, tempfile::tempfile()?); let mut cur_writer_len = 0; let mut group_sizes = vec![]; @@ -259,7 +323,7 @@ impl<'t> FacetsUpdateBulkAlgorithm<'t> { Ok(()) })?; // don't forget to insert the leftover elements into the writer as well - if !bitmaps.is_empty() && cur_writer_len >= self.level_group_size * self.min_level_size { + if !bitmaps.is_empty() && cur_writer_len >= self.min_level_size { let left_bound = left_bounds.first().unwrap(); handle_group(&bitmaps, left_bound)?; for ((bitmap, left_bound), group_size) in @@ -274,7 +338,7 @@ impl<'t> FacetsUpdateBulkAlgorithm<'t> { cur_writer_len += 1; } } - if cur_writer_len > self.level_group_size * self.min_level_size { + if cur_writer_len > self.min_level_size { sub_writers.push(writer_into_reader(cur_writer)?); } return Ok(sub_writers); @@ -315,9 +379,9 @@ mod tests { documents.push(serde_json::json!({ "facet2": i }).as_object().unwrap().clone()); } let documents = documents_batch_reader_from_objects(documents); - + dbg!(); index.add_documents(documents).unwrap(); - + dbg!(); db_snap!(index, facet_id_f64_docids, name); }; diff --git a/milli/src/update/facet/mod.rs b/milli/src/update/facet/mod.rs index ecde3a248..00964a406 100644 --- a/milli/src/update/facet/mod.rs +++ b/milli/src/update/facet/mod.rs @@ -1,2 +1,90 @@ +use std::{collections::HashMap, fs::File}; + +use grenad::CompressionType; +use heed::BytesDecode; +use roaring::RoaringBitmap; + +use crate::{ + facet::FacetType, + heed_codec::facet::new::{FacetGroupValueCodec, FacetKeyCodec, MyByteSlice}, + CboRoaringBitmapCodec, FieldId, Index, Result, +}; + +use super::{FacetsUpdateBulk, FacetsUpdateIncremental}; + pub mod bulk; pub mod incremental; + +pub struct FacetsUpdate<'i> { + index: &'i Index, + database: heed::Database, FacetGroupValueCodec>, + level_group_size: u8, + max_level_group_size: u8, + min_level_size: u8, + facet_type: FacetType, + new_data: grenad::Reader, +} +impl<'i> FacetsUpdate<'i> { + pub fn new(index: &'i Index, facet_type: FacetType, new_data: grenad::Reader) -> Self { + let database = match facet_type { + FacetType::String => { + index.facet_id_string_docids.remap_key_type::>() + } + FacetType::Number => { + index.facet_id_f64_docids.remap_key_type::>() + } + }; + Self { + index, + database, + level_group_size: 4, + max_level_group_size: 8, + min_level_size: 5, + facet_type, + new_data, + } + } + + // /// The number of elements from the level below that are represented by a single element in the level above + // /// + // /// This setting is always greater than or equal to 2. + // pub fn level_group_size(&mut self, value: u8) -> &mut Self { + // self.level_group_size = std::cmp::max(value, 2); + // self + // } + + // /// The minimum number of elements that a level is allowed to have. + // pub fn min_level_size(&mut self, value: u8) -> &mut Self { + // self.min_level_size = std::cmp::max(value, 1); + // self + // } + + pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> { + if self.database.is_empty(wtxn)? { + let bulk_update = FacetsUpdateBulk::new(self.index, self.facet_type, self.new_data); + bulk_update.execute(wtxn)?; + } else { + let indexer = FacetsUpdateIncremental::new(self.database); + + let mut new_faceted_docids = HashMap::::default(); + + let mut cursor = self.new_data.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { + let key = + FacetKeyCodec::::bytes_decode(key).ok_or(heed::Error::Encoding)?; + let docids = + CboRoaringBitmapCodec::bytes_decode(value).ok_or(heed::Error::Encoding)?; + indexer.insert(wtxn, key.field_id, key.left_bound, &docids)?; + *new_faceted_docids.entry(key.field_id).or_default() |= docids; + } + + for (field_id, new_docids) in new_faceted_docids { + let mut docids = + self.index.faceted_documents_ids(wtxn, field_id, self.facet_type)?; + docids |= new_docids; + self.index.put_faceted_documents_ids(wtxn, field_id, self.facet_type, &docids)?; + } + } + Ok(()) + } +} diff --git a/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/default/facet_id_f64_docids.hash.snap b/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/default/facet_id_f64_docids.hash.snap index b990c31c7..960843592 100644 --- a/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/default/facet_id_f64_docids.hash.snap +++ b/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/default/facet_id_f64_docids.hash.snap @@ -1,4 +1,4 @@ --- source: milli/src/update/facet/bulk.rs --- -947949d1a5c9c4e895c89fba46cbba68 +07718df52f8463335fb8fefcd3ae01f4 diff --git a/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/large_groups_large_levels/facet_id_f64_docids.hash.snap b/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/large_groups_large_levels/facet_id_f64_docids.hash.snap index b990c31c7..960843592 100644 --- a/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/large_groups_large_levels/facet_id_f64_docids.hash.snap +++ b/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/large_groups_large_levels/facet_id_f64_docids.hash.snap @@ -1,4 +1,4 @@ --- source: milli/src/update/facet/bulk.rs --- -947949d1a5c9c4e895c89fba46cbba68 +07718df52f8463335fb8fefcd3ae01f4 diff --git a/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/large_groups_small_levels/facet_id_f64_docids.hash.snap b/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/large_groups_small_levels/facet_id_f64_docids.hash.snap index b990c31c7..960843592 100644 --- a/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/large_groups_small_levels/facet_id_f64_docids.hash.snap +++ b/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/large_groups_small_levels/facet_id_f64_docids.hash.snap @@ -1,4 +1,4 @@ --- source: milli/src/update/facet/bulk.rs --- -947949d1a5c9c4e895c89fba46cbba68 +07718df52f8463335fb8fefcd3ae01f4 diff --git a/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/small_groups_large_levels/facet_id_f64_docids.hash.snap b/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/small_groups_large_levels/facet_id_f64_docids.hash.snap index b990c31c7..960843592 100644 --- a/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/small_groups_large_levels/facet_id_f64_docids.hash.snap +++ b/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/small_groups_large_levels/facet_id_f64_docids.hash.snap @@ -1,4 +1,4 @@ --- source: milli/src/update/facet/bulk.rs --- -947949d1a5c9c4e895c89fba46cbba68 +07718df52f8463335fb8fefcd3ae01f4 diff --git a/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/small_groups_small_levels/facet_id_f64_docids.hash.snap b/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/small_groups_small_levels/facet_id_f64_docids.hash.snap index b990c31c7..960843592 100644 --- a/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/small_groups_small_levels/facet_id_f64_docids.hash.snap +++ b/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/small_groups_small_levels/facet_id_f64_docids.hash.snap @@ -1,4 +1,4 @@ --- source: milli/src/update/facet/bulk.rs --- -947949d1a5c9c4e895c89fba46cbba68 +07718df52f8463335fb8fefcd3ae01f4 diff --git a/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/tiny_groups_tiny_levels/facet_id_f64_docids.hash.snap b/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/tiny_groups_tiny_levels/facet_id_f64_docids.hash.snap index b990c31c7..960843592 100644 --- a/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/tiny_groups_tiny_levels/facet_id_f64_docids.hash.snap +++ b/milli/src/update/facet/snapshots/bulk.rs/test_facets_number/tiny_groups_tiny_levels/facet_id_f64_docids.hash.snap @@ -1,4 +1,4 @@ --- source: milli/src/update/facet/bulk.rs --- -947949d1a5c9c4e895c89fba46cbba68 +07718df52f8463335fb8fefcd3ae01f4 diff --git a/milli/src/update/facet/snapshots/bulk.rs/test_facets_string/default/facet_id_string_docids.hash.snap b/milli/src/update/facet/snapshots/bulk.rs/test_facets_string/default/facet_id_string_docids.hash.snap index 7ed43424a..574a3c393 100644 --- a/milli/src/update/facet/snapshots/bulk.rs/test_facets_string/default/facet_id_string_docids.hash.snap +++ b/milli/src/update/facet/snapshots/bulk.rs/test_facets_string/default/facet_id_string_docids.hash.snap @@ -1,4 +1,4 @@ --- source: milli/src/update/facet/bulk.rs --- -5ce8009d3eb023e4b9c0a6e7fa4e6262 +3e6a91b3c54c614a4787224ac4278ed3 diff --git a/milli/src/update/facet/snapshots/bulk.rs/test_facets_string/tiny_groups_tiny_levels/facet_id_string_docids.hash.snap b/milli/src/update/facet/snapshots/bulk.rs/test_facets_string/tiny_groups_tiny_levels/facet_id_string_docids.hash.snap index 7ed43424a..574a3c393 100644 --- a/milli/src/update/facet/snapshots/bulk.rs/test_facets_string/tiny_groups_tiny_levels/facet_id_string_docids.hash.snap +++ b/milli/src/update/facet/snapshots/bulk.rs/test_facets_string/tiny_groups_tiny_levels/facet_id_string_docids.hash.snap @@ -1,4 +1,4 @@ --- source: milli/src/update/facet/bulk.rs --- -5ce8009d3eb023e4b9c0a6e7fa4e6262 +3e6a91b3c54c614a4787224ac4278ed3 diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index df98724da..16784bd92 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -1,5 +1,4 @@ use std::borrow::Cow; -use std::collections::HashMap; use std::convert::TryInto; use std::fs::File; use std::io; @@ -14,12 +13,12 @@ use super::helpers::{ valid_lmdb_key, CursorClonableMmap, }; use super::{ClonableMmap, MergeFn}; -use crate::heed_codec::facet::new::{FacetKeyCodec, MyByteSlice}; +use crate::facet::FacetType; +use crate::update::facet::FacetsUpdate; use crate::update::index_documents::helpers::as_cloneable_grenad; -use crate::update::FacetsUpdateIncremental; use crate::{ - lat_lng_to_xyz, BoRoaringBitmapCodec, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, - Index, Result, + lat_lng_to_xyz, BoRoaringBitmapCodec, CboRoaringBitmapCodec, DocumentId, GeoPoint, Index, + Result, }; pub(crate) enum TypedChunk { @@ -138,78 +137,14 @@ pub(crate) fn write_typed_chunk_into_index( )?; is_merged_database = true; } - TypedChunk::FieldIdFacetNumberDocids(facet_id_f64_docids_iter) => { - // merge cbo roaring bitmaps is not the correct merger because the data in the DB - // is FacetGroupValue and not RoaringBitmap - // so I need to create my own merging function - - // facet_id_string_docids is encoded as: - // key: FacetKeyCodec - // value: CboRoaringBitmapCodec - // basically - - // TODO: a condition saying "if I have more than 1/50th of the DB to add, - // then I do it in bulk, otherwise I do it incrementally". But instead of 1/50, - // it is a ratio I determine empirically - - // for now I only do it incrementally, to see if things work - let indexer = FacetsUpdateIncremental::new( - index.facet_id_f64_docids.remap_key_type::>(), - ); - - let mut new_faceted_docids = HashMap::::default(); - - let mut cursor = facet_id_f64_docids_iter.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { - let key = - FacetKeyCodec::::bytes_decode(key).ok_or(heed::Error::Encoding)?; - let docids = - CboRoaringBitmapCodec::bytes_decode(value).ok_or(heed::Error::Encoding)?; - indexer.insert(wtxn, key.field_id, key.left_bound, &docids)?; - *new_faceted_docids.entry(key.field_id).or_default() |= docids; - } - for (field_id, new_docids) in new_faceted_docids { - let mut docids = index.number_faceted_documents_ids(wtxn, field_id)?; - docids |= new_docids; - index.put_number_faceted_documents_ids(wtxn, field_id, &docids)?; - } - + TypedChunk::FieldIdFacetNumberDocids(facet_id_number_docids_iter) => { + let indexer = FacetsUpdate::new(index, FacetType::Number, facet_id_number_docids_iter); + indexer.execute(wtxn)?; is_merged_database = true; } - TypedChunk::FieldIdFacetStringDocids(facet_id_string_docids) => { - // merge cbo roaring bitmaps is not the correct merger because the data in the DB - // is FacetGroupValue and not RoaringBitmap - // so I need to create my own merging function - - // facet_id_string_docids is encoded as: - // key: FacetKeyCodec - // value: CboRoaringBitmapCodec - // basically - - // TODO: a condition saying "if I have more than 1/50th of the DB to add, - // then I do it in bulk, otherwise I do it incrementally". But instead of 1/50, - // it is a ratio I determine empirically - - // for now I only do it incrementally, to see if things work - let indexer = FacetsUpdateIncremental::new( - index.facet_id_string_docids.remap_key_type::>(), - ); - let mut new_faceted_docids = HashMap::::default(); - - let mut cursor = facet_id_string_docids.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { - let key = - FacetKeyCodec::::bytes_decode(key).ok_or(heed::Error::Encoding)?; - let docids = - CboRoaringBitmapCodec::bytes_decode(value).ok_or(heed::Error::Encoding)?; - indexer.insert(wtxn, key.field_id, key.left_bound, &docids)?; - *new_faceted_docids.entry(key.field_id).or_default() |= docids; - } - for (field_id, new_docids) in new_faceted_docids { - let mut docids = index.string_faceted_documents_ids(wtxn, field_id)?; - docids |= new_docids; - index.put_string_faceted_documents_ids(wtxn, field_id, &docids)?; - } + TypedChunk::FieldIdFacetStringDocids(facet_id_string_docids_iter) => { + let indexer = FacetsUpdate::new(index, FacetType::String, facet_id_string_docids_iter); + indexer.execute(wtxn)?; is_merged_database = true; } TypedChunk::FieldIdFacetExistsDocids(facet_id_exists_docids) => {