From db0c681baef3f0f0e90c5c5feb56cc3ec1509248 Mon Sep 17 00:00:00 2001 From: many Date: Thu, 2 Sep 2021 15:17:52 +0200 Subject: [PATCH] Fix Pr comments --- milli/Cargo.toml | 1 - .../facet_string_level_zero_value_codec.rs | 3 +- .../cbo_roaring_bitmap_codec.rs | 6 ++-- .../extract/extract_fid_docid_facet_values.rs | 9 +++--- .../extract/extract_fid_word_count_docids.rs | 5 ++- .../extract/extract_word_docids.rs | 5 ++- .../extract_word_level_position_docids.rs | 6 +++- .../extract_word_pair_proximity_docids.rs | 31 +++---------------- .../src/update/index_documents/extract/mod.rs | 1 + .../index_documents/helpers/clonable_mmap.rs | 2 ++ .../index_documents/helpers/grenad_helpers.rs | 12 ++++--- .../src/update/index_documents/helpers/mod.rs | 4 --- milli/src/update/index_documents/mod.rs | 3 +- .../src/update/index_documents/typed_chunk.rs | 8 +---- 14 files changed, 38 insertions(+), 58 deletions(-) diff --git a/milli/Cargo.toml b/milli/Cargo.toml index edcec4d5b..8616dcf4a 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -6,7 +6,6 @@ edition = "2018" [dependencies] bstr = "0.2.15" -byte-unit = { version = "4.0.9", default-features = false, features = ["std"] } byteorder = "1.4.2" chrono = { version = "0.4.19", features = ["serde"] } concat-arrays = "0.1.2" diff --git a/milli/src/heed_codec/facet/facet_string_level_zero_value_codec.rs b/milli/src/heed_codec/facet/facet_string_level_zero_value_codec.rs index 914d7c3cd..22031c474 100644 --- a/milli/src/heed_codec/facet/facet_string_level_zero_value_codec.rs +++ b/milli/src/heed_codec/facet/facet_string_level_zero_value_codec.rs @@ -5,6 +5,7 @@ use std::{marker, str}; use crate::error::SerializationError; use crate::heed_codec::RoaringBitmapCodec; use crate::{try_split_array_at, try_split_at, Result}; + pub type FacetStringLevelZeroValueCodec = StringValueCodec; /// A codec that encodes a string in front of a value. @@ -22,7 +23,6 @@ where fn bytes_decode(bytes: &'a [u8]) -> Option { let (string, bytes) = decode_prefix_string(bytes)?; - C::bytes_decode(bytes).map(|item| (string, item)) } } @@ -49,7 +49,6 @@ pub fn decode_prefix_string(value: &[u8]) -> Option<(&str, &[u8])> { let original_length = u16::from_be_bytes(original_length_bytes) as usize; let (string, bytes) = try_split_at(bytes, original_length)?; let string = str::from_utf8(string).ok()?; - Some((string, bytes)) } diff --git a/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs b/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs index c0e984d44..519997274 100644 --- a/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs +++ b/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs @@ -55,9 +55,9 @@ impl CboRoaringBitmapCodec { /// Merge serialized CboRoaringBitmaps in a buffer. /// - /// if the merged values len is under the threshold, - /// values are directly serialized in the buffer; - /// else a RoaringBitmap is created from the values and is serialized in the buffer. + /// if the merged values length is under the threshold, values are directly + /// serialized in the buffer else a RoaringBitmap is created from the + /// values and is serialized in the buffer. pub fn merge_into(slices: &[Cow<[u8]>], buffer: &mut Vec) -> io::Result<()> { let mut roaring = RoaringBitmap::new(); let mut vec = Vec::new(); diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index c46329f61..a1bf0b1e3 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -58,11 +58,12 @@ pub fn extract_fid_docid_facet_values( // insert facet numbers in sorter for number in numbers { key_buffer.truncate(size_of::() + size_of::()); - let value_bytes = f64_into_bytes(number).unwrap(); // invalid float - key_buffer.extend_from_slice(&value_bytes); - key_buffer.extend_from_slice(&number.to_be_bytes()); + if let Some(value_bytes) = f64_into_bytes(number) { + key_buffer.extend_from_slice(&value_bytes); + key_buffer.extend_from_slice(&number.to_be_bytes()); - fid_docid_facet_numbers_sorter.insert(&key_buffer, ().as_bytes())?; + fid_docid_facet_numbers_sorter.insert(&key_buffer, ().as_bytes())?; + } } // insert normalized and original facet string in sorter diff --git a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs index cf698507d..1fbc55714 100644 --- a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs +++ b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs @@ -8,6 +8,8 @@ use super::helpers::{ create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader, try_split_array_at, GrenadParameters, MergeFn, }; +use crate::error::SerializationError; +use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::proximity::extract_position; use crate::{DocumentId, FieldId, Result}; @@ -36,7 +38,8 @@ pub fn extract_fid_word_count_docids( let mut current_document_id = None; while let Some((key, value)) = docid_word_positions.next()? { - let (document_id_bytes, _word_bytes) = try_split_array_at(key).unwrap(); + let (document_id_bytes, _word_bytes) = try_split_array_at(key) + .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = u32::from_be_bytes(document_id_bytes); let curr_document_id = *current_document_id.get_or_insert(document_id); diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index 8ca8e39eb..6d99fda44 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -8,6 +8,8 @@ use super::helpers::{ create_sorter, merge_roaring_bitmaps, serialize_roaring_bitmap, sorter_into_reader, try_split_array_at, GrenadParameters, }; +use crate::error::SerializationError; +use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::Result; /// Extracts the word and the documents ids where this word appear. @@ -31,7 +33,8 @@ pub fn extract_word_docids( let mut value_buffer = Vec::new(); while let Some((key, _value)) = docid_word_positions.next()? { - let (document_id_bytes, word_bytes) = try_split_array_at(key).unwrap(); + let (document_id_bytes, word_bytes) = try_split_array_at(key) + .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = u32::from_be_bytes(document_id_bytes); let bitmap = RoaringBitmap::from_iter(Some(document_id)); diff --git a/milli/src/update/index_documents/extract/extract_word_level_position_docids.rs b/milli/src/update/index_documents/extract/extract_word_level_position_docids.rs index e099b0b49..04cedf5c7 100644 --- a/milli/src/update/index_documents/extract/extract_word_level_position_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_level_position_docids.rs @@ -5,7 +5,10 @@ use super::helpers::{ create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader, try_split_array_at, GrenadParameters, }; +use crate::error::SerializationError; +use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::{DocumentId, Result}; + /// Extracts the word positions and the documents ids where this word appear. /// /// Returns a grenad reader with the list of extracted words at positions and @@ -27,7 +30,8 @@ pub fn extract_word_level_position_docids( let mut key_buffer = Vec::new(); while let Some((key, value)) = docid_word_positions.next()? { - let (document_id_bytes, word_bytes) = try_split_array_at(key).unwrap(); + let (document_id_bytes, word_bytes) = try_split_array_at(key) + .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = DocumentId::from_be_bytes(document_id_bytes); for position in read_u32_ne_bytes(value) { diff --git a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs index ce75c319e..982799a65 100644 --- a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs @@ -1,15 +1,14 @@ use std::cmp::Ordering; use std::collections::{BinaryHeap, HashMap}; use std::fs::File; -use std::time::{Duration, Instant}; use std::{cmp, io, mem, str, vec}; -use log::debug; - use super::helpers::{ create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader, try_split_array_at, GrenadParameters, MergeFn, }; +use crate::error::SerializationError; +use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::proximity::{positions_proximity, MAX_DISTANCE}; use crate::{DocumentId, Result}; @@ -32,16 +31,13 @@ pub fn extract_word_pair_proximity_docids( max_memory.map(|m| m / 2), ); - let mut number_of_documents = 0; - let mut total_time_aggregation = Duration::default(); - let mut total_time_grenad_insert = Duration::default(); - // This map is assumed to not consume a lot of memory. let mut document_word_positions_heap = BinaryHeap::new(); let mut current_document_id = None; while let Some((key, value)) = docid_word_positions.next()? { - let (document_id_bytes, word_bytes) = try_split_array_at(key).unwrap(); + let (document_id_bytes, word_bytes) = try_split_array_at(key) + .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = u32::from_be_bytes(document_id_bytes); let word = str::from_utf8(word_bytes)?; @@ -52,10 +48,7 @@ pub fn extract_word_pair_proximity_docids( curr_document_id, document_word_positions_heap, &mut word_pair_proximity_docids_sorter, - &mut total_time_aggregation, - &mut total_time_grenad_insert, )?; - number_of_documents += 1; current_document_id = Some(document_id); } @@ -74,18 +67,9 @@ pub fn extract_word_pair_proximity_docids( document_id, document_word_positions_heap, &mut word_pair_proximity_docids_sorter, - &mut total_time_aggregation, - &mut total_time_grenad_insert, )?; } - debug!( - "Number of documents {} - - we took {:02?} to aggregate proximities - - we took {:02?} to grenad insert those proximities", - number_of_documents, total_time_aggregation, total_time_grenad_insert, - ); - sorter_into_reader(word_pair_proximity_docids_sorter, indexer) } @@ -97,10 +81,7 @@ fn document_word_positions_into_sorter<'b>( document_id: DocumentId, mut word_positions_heap: BinaryHeap>>, word_pair_proximity_docids_sorter: &mut grenad::Sorter, - total_time_aggregation: &mut Duration, - total_time_grenad_insert: &mut Duration, ) -> Result<()> { - let before_aggregating = Instant::now(); let mut word_pair_proximity = HashMap::new(); let mut ordered_peeked_word_positions = Vec::new(); while !word_positions_heap.is_empty() { @@ -152,8 +133,6 @@ fn document_word_positions_into_sorter<'b>( } } - *total_time_aggregation += before_aggregating.elapsed(); - let mut key_buffer = Vec::new(); for ((w1, w2), prox) in word_pair_proximity { key_buffer.clear(); @@ -162,9 +141,7 @@ fn document_word_positions_into_sorter<'b>( key_buffer.extend_from_slice(w2.as_bytes()); key_buffer.push(prox as u8); - let before_grenad_insert = Instant::now(); word_pair_proximity_docids_sorter.insert(&key_buffer, &document_id.to_ne_bytes())?; - *total_time_grenad_insert += before_grenad_insert.elapsed(); } Ok(()) diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 04c57b0fa..bb49e3e51 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -225,5 +225,6 @@ fn extract_documents_data( Ok((docid_fid_facet_numbers_chunk, docid_fid_facet_strings_chunk)) }, ); + Ok((docid_word_positions_chunk?, docid_fid_facet_values_chunks?)) } diff --git a/milli/src/update/index_documents/helpers/clonable_mmap.rs b/milli/src/update/index_documents/helpers/clonable_mmap.rs index b16c080ff..691d10593 100644 --- a/milli/src/update/index_documents/helpers/clonable_mmap.rs +++ b/milli/src/update/index_documents/helpers/clonable_mmap.rs @@ -2,6 +2,8 @@ use std::sync::Arc; use memmap::Mmap; +/// Wrapper around Mmap allowing to virtualy clone grenad-chunks +/// in a parallel process like the indexing. #[derive(Debug, Clone)] pub struct ClonableMmap { inner: Arc, diff --git a/milli/src/update/index_documents/helpers/grenad_helpers.rs b/milli/src/update/index_documents/helpers/grenad_helpers.rs index 9dd261f73..1dfaaf945 100644 --- a/milli/src/update/index_documents/helpers/grenad_helpers.rs +++ b/milli/src/update/index_documents/helpers/grenad_helpers.rs @@ -3,7 +3,6 @@ use std::fs::File; use std::io::{self, Seek, SeekFrom}; use std::time::Instant; -use byte_unit::Byte; use grenad::{CompressionType, MergerIter, Reader, Sorter}; use heed::types::ByteSlice; use log::debug; @@ -113,6 +112,9 @@ impl Default for GrenadParameters { } impl GrenadParameters { + /// This function use the number of threads in the current threadpool to compute the value. + /// This should be called inside of a rayon thread pool, + /// Otherwise, it will take the global number of threads. pub fn max_memory_by_thread(&self) -> Option { self.max_memory.map(|max_memory| max_memory / rayon::current_num_threads()) } @@ -128,7 +130,7 @@ pub fn grenad_obkv_into_chunks( mut reader: grenad::Reader, indexer: GrenadParameters, log_frequency: Option, - documents_chunk_size: Byte, + documents_chunk_size: usize, ) -> Result>>> { let mut document_count = 0; let mut continue_reading = true; @@ -157,7 +159,7 @@ pub fn grenad_obkv_into_chunks( debug!("reached {} chunked documents", document_count); } - if current_chunk_size >= documents_chunk_size.get_bytes() { + if current_chunk_size >= documents_chunk_size as u64 { return writer_into_reader(obkv_documents).map(Some); } } @@ -170,8 +172,8 @@ pub fn grenad_obkv_into_chunks( let result = transposer().transpose(); if result.as_ref().map_or(false, |r| r.is_ok()) { debug!( - "A new chunk of approximately {} has been generated", - documents_chunk_size.get_appropriate_unit(true), + "A new chunk of approximately {:.2} MiB has been generated", + documents_chunk_size as f64 / 1024.0 / 1024.0, ); } result diff --git a/milli/src/update/index_documents/helpers/mod.rs b/milli/src/update/index_documents/helpers/mod.rs index baacb0a1b..3f38d4f25 100644 --- a/milli/src/update/index_documents/helpers/mod.rs +++ b/milli/src/update/index_documents/helpers/mod.rs @@ -40,10 +40,6 @@ where Some((head, tail)) } -// pub fn pretty_thousands, T: fmt::Display>(number: A) -> String { -// thousands::Separable::separate_with_spaces(number.borrow()) -// } - pub fn read_u32_ne_bytes(bytes: &[u8]) -> impl Iterator + '_ { bytes.chunks_exact(4).flat_map(TryInto::try_into).map(u32::from_ne_bytes) } diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 98b0aa80e..b27f2042f 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -9,7 +9,6 @@ use std::iter::FromIterator; use std::num::{NonZeroU32, NonZeroUsize}; use std::time::Instant; -use byte_unit::Byte; use chrono::Utc; use crossbeam_channel::{Receiver, Sender}; use grenad::{self, CompressionType}; @@ -252,7 +251,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { documents_file, params.clone(), self.log_every_n, - Byte::from_bytes(self.documents_chunk_size.unwrap_or(1024 * 1024 * 128) as u64), // 128MiB + self.documents_chunk_size.unwrap_or(1024 * 1024 * 128), // 128MiB ); let result = chunk_iter.map(|chunk_iter| { diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index c3c71bbf4..5f28034fe 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::fs::File; use heed::types::ByteSlice; @@ -188,8 +189,6 @@ fn merge_roaring_bitmaps(new_value: &[u8], db_value: &[u8], buffer: &mut Vec Ok(serialize_roaring_bitmap(&value, buffer)?) } -use std::borrow::Cow; - fn merge_cbo_roaring_bitmaps( new_value: &[u8], db_value: &[u8], @@ -199,11 +198,6 @@ fn merge_cbo_roaring_bitmaps( &[Cow::Borrowed(db_value), Cow::Borrowed(new_value)], buffer, )?) - - // let new_value = CboRoaringBitmapCodec::deserialize_from(new_value)?; - // let db_value = CboRoaringBitmapCodec::deserialize_from(db_value)?; - // let value = new_value | db_value; - // Ok(CboRoaringBitmapCodec::serialize_into(&value, buffer)) } /// Write provided entries in database using serialize_value function.