From aed8c69bcb94b83ccb2957b741584f908c22d094 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Lecrenier?= Date: Tue, 19 Jul 2022 09:57:28 +0200 Subject: [PATCH] Refactor indexation of the "facet-id-exists-docids" database The idea is to directly create a sorted and merged list of bitmaps in the form of a BTreeMap instead of creating a grenad::Reader where the keys are field_id and the values are docids. Then we send that BTreeMap to the thing that handles TypedChunks, which inserts its content into the database. --- .../extract/extract_fid_docid_facet_values.rs | 23 +++--- .../src/update/index_documents/extract/mod.rs | 35 +++++---- .../src/update/index_documents/typed_chunk.rs | 73 ++++++++++++++++--- 3 files changed, 92 insertions(+), 39 deletions(-) diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index 6d66a7a64..368378792 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -1,16 +1,16 @@ -use std::collections::HashSet; +use std::collections::{BTreeMap, HashSet}; use std::convert::TryInto; use std::fs::File; use std::io; use std::mem::size_of; use heed::zerocopy::AsBytes; +use roaring::RoaringBitmap; use serde_json::Value; use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters}; use crate::error::InternalError; use crate::facet::value_encoding::f64_into_bytes; -use crate::update::index_documents::merge_cbo_roaring_bitmaps; use crate::{DocumentId, FieldId, Result, BEU32}; /// Extracts the facet values of each faceted field of each document. @@ -22,7 +22,7 @@ pub fn extract_fid_docid_facet_values( obkv_documents: grenad::Reader, indexer: GrenadParameters, faceted_fields: &HashSet, -) -> Result<(grenad::Reader, grenad::Reader, grenad::Reader)> { +) -> Result<(grenad::Reader, grenad::Reader, BTreeMap)> { let max_memory = indexer.max_memory_by_thread(); let mut fid_docid_facet_numbers_sorter = create_sorter( @@ -30,7 +30,7 @@ pub fn extract_fid_docid_facet_values( indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, - max_memory.map(|m| m / 3), + max_memory.map(|m| m / 2), ); let mut fid_docid_facet_strings_sorter = create_sorter( @@ -38,16 +38,10 @@ pub fn extract_fid_docid_facet_values( indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, - max_memory.map(|m| m / 3), + max_memory.map(|m| m / 2), ); - let mut fid_docid_facet_exists_sorter = create_sorter( - merge_cbo_roaring_bitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory.map(|m| m / 3), - ); + let mut facet_exists_docids = BTreeMap::::new(); let mut key_buffer = Vec::new(); let mut cursor = obkv_documents.into_cursor()?; @@ -65,7 +59,8 @@ pub fn extract_fid_docid_facet_values( // Here, we know already that the document must be added to the “field id exists” database let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap(); let document = BEU32::from(document).get(); - fid_docid_facet_exists_sorter.insert(&key_buffer, document.to_ne_bytes())?; + + facet_exists_docids.entry(field_id).or_default().insert(document); // For the other extraction tasks, prefix the key with the field_id and the document_id key_buffer.extend_from_slice(&docid_bytes); @@ -99,7 +94,7 @@ pub fn extract_fid_docid_facet_values( Ok(( sorter_into_reader(fid_docid_facet_numbers_sorter, indexer.clone())?, sorter_into_reader(fid_docid_facet_strings_sorter, indexer.clone())?, - sorter_into_reader(fid_docid_facet_exists_sorter, indexer)?, + facet_exists_docids, )) } diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index bb695a99f..76d968919 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -8,12 +8,13 @@ mod extract_word_docids; mod extract_word_pair_proximity_docids; mod extract_word_position_docids; -use std::collections::HashSet; +use std::collections::{BTreeMap, HashSet}; use std::fs::File; use crossbeam_channel::Sender; use log::debug; use rayon::prelude::*; +use roaring::RoaringBitmap; use self::extract_docid_word_positions::extract_docid_word_positions; use self::extract_facet_number_docids::extract_facet_number_docids; @@ -72,12 +73,24 @@ pub(crate) fn data_from_obkv_documents( let ( docid_word_positions_chunks, - ( - docid_fid_facet_numbers_chunks, - (docid_fid_facet_strings_chunks, docid_fid_facet_exists_chunks), - ), + (docid_fid_facet_numbers_chunks, (docid_fid_facet_strings_chunks, facet_exists_docids)), ) = result?; + // merge facet_exists_docids hashmaps and send them as a typed chunk + { + let lmdb_writer_sx = lmdb_writer_sx.clone(); + rayon::spawn(move || { + let mut all = BTreeMap::default(); + for facet_exists_docids in facet_exists_docids { + for (field_id, docids) in facet_exists_docids { + let docids0 = all.entry(field_id).or_default(); + *docids0 |= docids; + } + } + let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetExistsDocids(all))); + }); + } + spawn_extraction_task::<_, _, Vec>>( docid_word_positions_chunks.clone(), indexer.clone(), @@ -141,12 +154,6 @@ pub(crate) fn data_from_obkv_documents( "field-id-facet-number-docids", ); - // spawn extraction task for field-id-facet-exists-docids - rayon::spawn(move || { - let reader = docid_fid_facet_exists_chunks.merge(merge_cbo_roaring_bitmaps, &indexer); - let _ = lmdb_writer_sx.send(reader.map(TypedChunk::FieldIdFacetExistsDocids)); - }); - Ok(()) } @@ -221,7 +228,7 @@ fn send_and_extract_flattened_documents_data( grenad::Reader, ( grenad::Reader, - (grenad::Reader, grenad::Reader), + (grenad::Reader, BTreeMap), ), )> { let flattened_documents_chunk = @@ -266,7 +273,7 @@ fn send_and_extract_flattened_documents_data( let ( docid_fid_facet_numbers_chunk, docid_fid_facet_strings_chunk, - docid_fid_facet_exists_chunk, + facet_exists_docids, ) = extract_fid_docid_facet_values( flattened_documents_chunk.clone(), indexer.clone(), @@ -291,7 +298,7 @@ fn send_and_extract_flattened_documents_data( Ok(( docid_fid_facet_numbers_chunk, - (docid_fid_facet_strings_chunk, docid_fid_facet_exists_chunk), + (docid_fid_facet_strings_chunk, facet_exists_docids), )) }, ); diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index e501e5efd..e1fd8f98d 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -1,11 +1,12 @@ use std::borrow::Cow; +use std::collections::BTreeMap; use std::convert::TryInto; use std::fs::File; use std::io; use grenad::MergerBuilder; use heed::types::ByteSlice; -use heed::{BytesDecode, RwTxn}; +use heed::{BytesDecode, BytesEncode, RwTxn}; use roaring::RoaringBitmap; use super::helpers::{ @@ -16,8 +17,8 @@ use super::{ClonableMmap, MergeFn}; use crate::heed_codec::facet::{decode_prefix_string, encode_prefix_string}; use crate::update::index_documents::helpers::as_cloneable_grenad; use crate::{ - lat_lng_to_xyz, BoRoaringBitmapCodec, CboRoaringBitmapCodec, DocumentId, GeoPoint, Index, - Result, + error, lat_lng_to_xyz, BoRoaringBitmapCodec, CboRoaringBitmapCodec, DocumentId, FieldId, + GeoPoint, Index, Result, BEU16, }; pub(crate) enum TypedChunk { @@ -35,7 +36,7 @@ pub(crate) enum TypedChunk { WordPairProximityDocids(grenad::Reader), FieldIdFacetStringDocids(grenad::Reader), FieldIdFacetNumberDocids(grenad::Reader), - FieldIdFacetExistsDocids(grenad::Reader), + FieldIdFacetExistsDocids(BTreeMap), GeoPoints(grenad::Reader), } @@ -147,16 +148,14 @@ pub(crate) fn write_typed_chunk_into_index( )?; is_merged_database = true; } - TypedChunk::FieldIdFacetExistsDocids(facet_id_exists_docids_iter) => { - append_entries_into_database( - facet_id_exists_docids_iter, + TypedChunk::FieldIdFacetExistsDocids(facet_id_exists_docids) => { + write_sorted_iterator_into_database( + facet_id_exists_docids.into_iter().map(|(k, v)| (BEU16::new(k), v)), &index.facet_id_exists_docids, + "facet-id-exists-docids", wtxn, - index_is_empty, - |value, _buffer| Ok(value), merge_cbo_roaring_bitmaps, - ) - .unwrap(); + )?; is_merged_database = true; } TypedChunk::WordPairProximityDocids(word_pair_proximity_docids_iter) => { @@ -270,6 +269,58 @@ fn merge_cbo_roaring_bitmaps( )?) } +fn write_sorted_iterator_into_database( + mut iterator: Iter, + database: &heed::Database, + database_name: &'static str, + wtxn: &mut RwTxn, + merge_values: Merge, +) -> Result<()> +where + for<'a> KeyCodec: BytesEncode<'a, EItem = Key>, + for<'a> ValueCodec: BytesEncode<'a, EItem = Value> + BytesDecode<'a, DItem = Value>, + Iter: Iterator, + Merge: Fn(&[u8], &[u8], &mut Vec) -> Result<()>, +{ + if database.is_empty(wtxn)? { + let mut database = database.iter_mut(wtxn)?.remap_types::(); + + while let Some((key, value)) = iterator.next() { + let key = KeyCodec::bytes_encode(&key) + .ok_or(error::SerializationError::Encoding { db_name: Some(database_name) })?; + if valid_lmdb_key(&key) { + let value = ValueCodec::bytes_encode(&value) + .ok_or(error::SerializationError::Encoding { db_name: Some(database_name) })?; + unsafe { database.append(&key, &value)? }; + } + } + + Ok(()) + } else { + let database = database.remap_types::(); + let mut buffer = Vec::new(); + while let Some((key, value)) = iterator.next() { + let key = KeyCodec::bytes_encode(&key) + .ok_or(error::SerializationError::Encoding { db_name: Some(database_name) })?; + if valid_lmdb_key(&key) { + let value = ValueCodec::bytes_encode(&value) + .ok_or(error::SerializationError::Encoding { db_name: Some(database_name) })?; + let value = match database.get(wtxn, &key)? { + Some(prev_value) => { + merge_values(&value, &prev_value, &mut buffer)?; + &buffer[..] + } + None => &value, + }; + + database.put(wtxn, &key, value)?; + } + } + + Ok(()) + } +} + /// Write provided entries in database using serialize_value function. /// merge_values function is used if an entry already exist in the database. fn write_entries_into_database(