diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index 368378792..cf116e6f5 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -5,13 +5,15 @@ use std::io; use std::mem::size_of; use heed::zerocopy::AsBytes; +use heed::BytesEncode; use roaring::RoaringBitmap; use serde_json::Value; use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters}; use crate::error::InternalError; use crate::facet::value_encoding::f64_into_bytes; -use crate::{DocumentId, FieldId, Result, BEU32}; +use crate::update::index_documents::{create_writer, writer_into_reader}; +use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, BEU32}; /// Extracts the facet values of each faceted field of each document. /// @@ -22,7 +24,7 @@ pub fn extract_fid_docid_facet_values( obkv_documents: grenad::Reader, indexer: GrenadParameters, faceted_fields: &HashSet, -) -> Result<(grenad::Reader, grenad::Reader, BTreeMap)> { +) -> Result<(grenad::Reader, grenad::Reader, grenad::Reader)> { let max_memory = indexer.max_memory_by_thread(); let mut fid_docid_facet_numbers_sorter = create_sorter( @@ -91,10 +93,21 @@ pub fn extract_fid_docid_facet_values( } } + let mut facet_exists_docids_writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + for (fid, bitmap) in facet_exists_docids.into_iter() { + let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap(); + facet_exists_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?; + } + let facet_exists_docids_reader = writer_into_reader(facet_exists_docids_writer)?; + Ok(( sorter_into_reader(fid_docid_facet_numbers_sorter, indexer.clone())?, sorter_into_reader(fid_docid_facet_strings_sorter, indexer.clone())?, - facet_exists_docids, + facet_exists_docids_reader, )) } diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 76d968919..157886e63 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -8,13 +8,12 @@ mod extract_word_docids; mod extract_word_pair_proximity_docids; mod extract_word_position_docids; -use std::collections::{BTreeMap, HashSet}; +use std::collections::HashSet; use std::fs::File; use crossbeam_channel::Sender; use log::debug; use rayon::prelude::*; -use roaring::RoaringBitmap; use self::extract_docid_word_positions::extract_docid_word_positions; use self::extract_facet_number_docids::extract_facet_number_docids; @@ -73,21 +72,25 @@ pub(crate) fn data_from_obkv_documents( let ( docid_word_positions_chunks, - (docid_fid_facet_numbers_chunks, (docid_fid_facet_strings_chunks, facet_exists_docids)), + ( + docid_fid_facet_numbers_chunks, + (docid_fid_facet_strings_chunks, facet_exists_docids_chunks), + ), ) = result?; - // merge facet_exists_docids hashmaps and send them as a typed chunk + // merge facet_exists_docids and send them as a typed chunk { let lmdb_writer_sx = lmdb_writer_sx.clone(); rayon::spawn(move || { - let mut all = BTreeMap::default(); - for facet_exists_docids in facet_exists_docids { - for (field_id, docids) in facet_exists_docids { - let docids0 = all.entry(field_id).or_default(); - *docids0 |= docids; + debug!("merge {} database", "facet-id-exists-docids"); + match facet_exists_docids_chunks.merge(merge_cbo_roaring_bitmaps, &indexer) { + Ok(reader) => { + let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetExistsDocids(reader))); + } + Err(e) => { + let _ = lmdb_writer_sx.send(Err(e)); } } - let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetExistsDocids(all))); }); } @@ -228,7 +231,7 @@ fn send_and_extract_flattened_documents_data( grenad::Reader, ( grenad::Reader, - (grenad::Reader, BTreeMap), + (grenad::Reader, grenad::Reader), ), )> { let flattened_documents_chunk = @@ -273,7 +276,7 @@ fn send_and_extract_flattened_documents_data( let ( docid_fid_facet_numbers_chunk, docid_fid_facet_strings_chunk, - facet_exists_docids, + fid_facet_exists_docids_chunk, ) = extract_fid_docid_facet_values( flattened_documents_chunk.clone(), indexer.clone(), @@ -298,7 +301,7 @@ fn send_and_extract_flattened_documents_data( Ok(( docid_fid_facet_numbers_chunk, - (docid_fid_facet_strings_chunk, facet_exists_docids), + (docid_fid_facet_strings_chunk, fid_facet_exists_docids_chunk), )) }, ); diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index e1fd8f98d..5b7b00c21 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -1,12 +1,11 @@ use std::borrow::Cow; -use std::collections::BTreeMap; use std::convert::TryInto; use std::fs::File; use std::io; use grenad::MergerBuilder; use heed::types::ByteSlice; -use heed::{BytesDecode, BytesEncode, RwTxn}; +use heed::{BytesDecode, RwTxn}; use roaring::RoaringBitmap; use super::helpers::{ @@ -17,8 +16,8 @@ use super::{ClonableMmap, MergeFn}; use crate::heed_codec::facet::{decode_prefix_string, encode_prefix_string}; use crate::update::index_documents::helpers::as_cloneable_grenad; use crate::{ - error, lat_lng_to_xyz, BoRoaringBitmapCodec, CboRoaringBitmapCodec, DocumentId, FieldId, - GeoPoint, Index, Result, BEU16, + lat_lng_to_xyz, BoRoaringBitmapCodec, CboRoaringBitmapCodec, DocumentId, GeoPoint, Index, + Result, }; pub(crate) enum TypedChunk { @@ -36,7 +35,7 @@ pub(crate) enum TypedChunk { WordPairProximityDocids(grenad::Reader), FieldIdFacetStringDocids(grenad::Reader), FieldIdFacetNumberDocids(grenad::Reader), - FieldIdFacetExistsDocids(BTreeMap), + FieldIdFacetExistsDocids(grenad::Reader), GeoPoints(grenad::Reader), } @@ -149,11 +148,12 @@ pub(crate) fn write_typed_chunk_into_index( is_merged_database = true; } TypedChunk::FieldIdFacetExistsDocids(facet_id_exists_docids) => { - write_sorted_iterator_into_database( - facet_id_exists_docids.into_iter().map(|(k, v)| (BEU16::new(k), v)), + append_entries_into_database( + facet_id_exists_docids, &index.facet_id_exists_docids, - "facet-id-exists-docids", wtxn, + index_is_empty, + |value, _buffer| Ok(value), merge_cbo_roaring_bitmaps, )?; is_merged_database = true; @@ -269,58 +269,6 @@ fn merge_cbo_roaring_bitmaps( )?) } -fn write_sorted_iterator_into_database( - mut iterator: Iter, - database: &heed::Database, - database_name: &'static str, - wtxn: &mut RwTxn, - merge_values: Merge, -) -> Result<()> -where - for<'a> KeyCodec: BytesEncode<'a, EItem = Key>, - for<'a> ValueCodec: BytesEncode<'a, EItem = Value> + BytesDecode<'a, DItem = Value>, - Iter: Iterator, - Merge: Fn(&[u8], &[u8], &mut Vec) -> Result<()>, -{ - if database.is_empty(wtxn)? { - let mut database = database.iter_mut(wtxn)?.remap_types::(); - - while let Some((key, value)) = iterator.next() { - let key = KeyCodec::bytes_encode(&key) - .ok_or(error::SerializationError::Encoding { db_name: Some(database_name) })?; - if valid_lmdb_key(&key) { - let value = ValueCodec::bytes_encode(&value) - .ok_or(error::SerializationError::Encoding { db_name: Some(database_name) })?; - unsafe { database.append(&key, &value)? }; - } - } - - Ok(()) - } else { - let database = database.remap_types::(); - let mut buffer = Vec::new(); - while let Some((key, value)) = iterator.next() { - let key = KeyCodec::bytes_encode(&key) - .ok_or(error::SerializationError::Encoding { db_name: Some(database_name) })?; - if valid_lmdb_key(&key) { - let value = ValueCodec::bytes_encode(&value) - .ok_or(error::SerializationError::Encoding { db_name: Some(database_name) })?; - let value = match database.get(wtxn, &key)? { - Some(prev_value) => { - merge_values(&value, &prev_value, &mut buffer)?; - &buffer[..] - } - None => &value, - }; - - database.put(wtxn, &key, value)?; - } - } - - Ok(()) - } -} - /// Write provided entries in database using serialize_value function. /// merge_values function is used if an entry already exist in the database. fn write_entries_into_database(