From d772073dfa5184c9a981e760cb2ff675c2cba3bd Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 28 Sep 2023 16:26:01 +0200 Subject: [PATCH] use a bufreader everytime there is a grenad --- milli/src/documents/enriched.rs | 7 ++-- milli/src/error.rs | 2 ++ milli/src/update/facet/bulk.rs | 9 ++--- milli/src/update/facet/incremental.rs | 5 +-- milli/src/update/facet/mod.rs | 9 +++-- milli/src/update/index_documents/enrich.rs | 4 +-- .../extract/extract_docid_word_positions.rs | 3 +- .../extract/extract_facet_number_docids.rs | 4 +-- .../extract/extract_facet_string_docids.rs | 4 +-- .../extract/extract_fid_docid_facet_values.rs | 12 +++---- .../extract/extract_fid_word_count_docids.rs | 4 +-- .../extract/extract_geo_points.rs | 4 +-- .../extract/extract_vector_points.rs | 4 +-- .../extract/extract_word_docids.rs | 4 +-- .../extract/extract_word_fid_docids.rs | 4 +-- .../extract_word_pair_proximity_docids.rs | 3 +- .../extract/extract_word_position_docids.rs | 4 +-- .../src/update/index_documents/extract/mod.rs | 32 ++++++++++------- .../index_documents/helpers/grenad_helpers.rs | 35 +++++++++++-------- milli/src/update/index_documents/transform.rs | 16 ++++++--- .../src/update/index_documents/typed_chunk.rs | 28 +++++++-------- milli/src/update/prefix_word_pairs/mod.rs | 11 +++--- 22 files changed, 122 insertions(+), 86 deletions(-) diff --git a/milli/src/documents/enriched.rs b/milli/src/documents/enriched.rs index fa21c0f87..4e1320c6c 100644 --- a/milli/src/documents/enriched.rs +++ b/milli/src/documents/enriched.rs @@ -1,4 +1,5 @@ use std::fs::File; +use std::io::BufReader; use std::{io, str}; use obkv::KvReader; @@ -19,14 +20,14 @@ use crate::FieldId; pub struct EnrichedDocumentsBatchReader { documents: DocumentsBatchReader, primary_key: String, - external_ids: grenad::ReaderCursor, + external_ids: grenad::ReaderCursor>, } impl EnrichedDocumentsBatchReader { pub fn new( documents: DocumentsBatchReader, primary_key: String, - external_ids: grenad::Reader, + external_ids: grenad::Reader>, ) -> Result { if documents.documents_count() as u64 == external_ids.len() { Ok(EnrichedDocumentsBatchReader { @@ -75,7 +76,7 @@ pub struct EnrichedDocument<'a> { pub struct EnrichedDocumentsBatchCursor { documents: DocumentsBatchCursor, primary_key: String, - external_ids: grenad::ReaderCursor, + external_ids: grenad::ReaderCursor>, } impl EnrichedDocumentsBatchCursor { diff --git a/milli/src/error.rs b/milli/src/error.rs index e9e1fddd3..303199bcb 100644 --- a/milli/src/error.rs +++ b/milli/src/error.rs @@ -47,6 +47,8 @@ pub enum InternalError { IndexingMergingKeys { process: &'static str }, #[error("{}", HeedError::InvalidDatabaseTyping)] InvalidDatabaseTyping, + #[error("Could not access the inner of a buf-reader/writer: {0}")] + BufIntoInnerError(String), #[error(transparent)] RayonThreadPool(#[from] ThreadPoolBuildError), #[error(transparent)] diff --git a/milli/src/update/facet/bulk.rs b/milli/src/update/facet/bulk.rs index 30f15ebab..a3f0c8f71 100644 --- a/milli/src/update/facet/bulk.rs +++ b/milli/src/update/facet/bulk.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; use std::fs::File; +use std::io::BufReader; use grenad::CompressionType; use heed::types::ByteSlice; @@ -30,7 +31,7 @@ pub struct FacetsUpdateBulk<'i> { facet_type: FacetType, field_ids: Vec, // None if level 0 does not need to be updated - new_data: Option>, + new_data: Option>>, } impl<'i> FacetsUpdateBulk<'i> { @@ -38,7 +39,7 @@ impl<'i> FacetsUpdateBulk<'i> { index: &'i Index, field_ids: Vec, facet_type: FacetType, - new_data: grenad::Reader, + new_data: grenad::Reader>, group_size: u8, min_level_size: u8, ) -> FacetsUpdateBulk<'i> { @@ -187,7 +188,7 @@ impl FacetsUpdateBulkInner { &self, field_id: FieldId, txn: &RoTxn, - ) -> Result<(Vec>, RoaringBitmap)> { + ) -> Result<(Vec>>, RoaringBitmap)> { let mut all_docids = RoaringBitmap::new(); let subwriters = self.compute_higher_levels(txn, field_id, 32, &mut |bitmaps, _| { for bitmap in bitmaps { @@ -259,7 +260,7 @@ impl FacetsUpdateBulkInner { field_id: u16, level: u8, handle_group: &mut dyn FnMut(&[RoaringBitmap], &'t [u8]) -> Result<()>, - ) -> Result>> { + ) -> Result>>> { if level == 0 { self.read_level_0(rtxn, field_id, handle_group)?; // Level 0 is already in the database diff --git a/milli/src/update/facet/incremental.rs b/milli/src/update/facet/incremental.rs index a921d4115..743c0b038 100644 --- a/milli/src/update/facet/incremental.rs +++ b/milli/src/update/facet/incremental.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::fs::File; +use std::io::BufReader; use heed::types::{ByteSlice, DecodeIgnore}; use heed::{BytesDecode, Error, RoTxn, RwTxn}; @@ -34,14 +35,14 @@ pub struct FacetsUpdateIncremental<'i> { index: &'i Index, inner: FacetsUpdateIncrementalInner, facet_type: FacetType, - new_data: grenad::Reader, + new_data: grenad::Reader>, } impl<'i> FacetsUpdateIncremental<'i> { pub fn new( index: &'i Index, facet_type: FacetType, - new_data: grenad::Reader, + new_data: grenad::Reader>, group_size: u8, min_level_size: u8, max_group_size: u8, diff --git a/milli/src/update/facet/mod.rs b/milli/src/update/facet/mod.rs index 15776a709..bbd25f91e 100644 --- a/milli/src/update/facet/mod.rs +++ b/milli/src/update/facet/mod.rs @@ -78,6 +78,7 @@ pub const FACET_MIN_LEVEL_SIZE: u8 = 5; use std::collections::BTreeSet; use std::fs::File; +use std::io::BufReader; use std::iter::FromIterator; use charabia::normalizer::{Normalize, NormalizerOption}; @@ -108,13 +109,17 @@ pub struct FacetsUpdate<'i> { index: &'i Index, database: heed::Database, FacetGroupValueCodec>, facet_type: FacetType, - new_data: grenad::Reader, + new_data: grenad::Reader>, group_size: u8, max_group_size: u8, min_level_size: u8, } impl<'i> FacetsUpdate<'i> { - pub fn new(index: &'i Index, facet_type: FacetType, new_data: grenad::Reader) -> Self { + pub fn new( + index: &'i Index, + facet_type: FacetType, + new_data: grenad::Reader>, + ) -> Self { let database = match facet_type { FacetType::String => index .facet_id_string_docids diff --git a/milli/src/update/index_documents/enrich.rs b/milli/src/update/index_documents/enrich.rs index 35a7c33f3..22b16f253 100644 --- a/milli/src/update/index_documents/enrich.rs +++ b/milli/src/update/index_documents/enrich.rs @@ -1,4 +1,4 @@ -use std::io::{Read, Seek}; +use std::io::{BufWriter, Read, Seek}; use std::result::Result as StdResult; use std::{fmt, iter}; @@ -35,7 +35,7 @@ pub fn enrich_documents_batch( let (mut cursor, mut documents_batch_index) = reader.into_cursor_and_fields_index(); - let mut external_ids = tempfile::tempfile().map(grenad::Writer::new)?; + let mut external_ids = tempfile::tempfile().map(BufWriter::new).map(grenad::Writer::new)?; let mut uuid_buffer = [0; uuid::fmt::Hyphenated::LENGTH]; // The primary key *field id* that has already been set for this index or the one diff --git a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs index 1c24a0fcf..643d16354 100644 --- a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs +++ b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs @@ -1,6 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::convert::TryInto; use std::fs::File; +use std::io::BufReader; use std::{io, mem, str}; use charabia::{Language, Script, SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder}; @@ -31,7 +32,7 @@ pub fn extract_docid_word_positions( allowed_separators: Option<&[&str]>, dictionary: Option<&[&str]>, max_positions_per_attributes: Option, -) -> Result<(RoaringBitmap, grenad::Reader, ScriptLanguageDocidsMap)> { +) -> Result<(RoaringBitmap, grenad::Reader>, ScriptLanguageDocidsMap)> { puffin::profile_function!(); let max_positions_per_attributes = max_positions_per_attributes diff --git a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs index dec02b120..d557e0b6c 100644 --- a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs @@ -1,5 +1,5 @@ use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use heed::{BytesDecode, BytesEncode}; @@ -19,7 +19,7 @@ use crate::Result; pub fn extract_facet_number_docids( docid_fid_facet_number: grenad::Reader, indexer: GrenadParameters, -) -> Result> { +) -> Result>> { puffin::profile_function!(); let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs index 0035f54e1..b1b27449e 100644 --- a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs @@ -1,5 +1,5 @@ use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use heed::BytesEncode; @@ -17,7 +17,7 @@ use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH}; pub fn extract_facet_string_docids( docid_fid_facet_string: grenad::Reader, indexer: GrenadParameters, -) -> Result> { +) -> Result>> { puffin::profile_function!(); let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index 5496a071b..42c355323 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -1,7 +1,7 @@ use std::collections::{BTreeMap, HashSet}; use std::convert::TryInto; use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use std::mem::size_of; use heed::zerocopy::AsBytes; @@ -17,11 +17,11 @@ use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, BEU32, MAX_FACET /// The extracted facet values stored in grenad files by type. pub struct ExtractedFacetValues { - pub docid_fid_facet_numbers_chunk: grenad::Reader, - pub docid_fid_facet_strings_chunk: grenad::Reader, - pub fid_facet_is_null_docids_chunk: grenad::Reader, - pub fid_facet_is_empty_docids_chunk: grenad::Reader, - pub fid_facet_exists_docids_chunk: grenad::Reader, + pub docid_fid_facet_numbers_chunk: grenad::Reader>, + pub docid_fid_facet_strings_chunk: grenad::Reader>, + pub fid_facet_is_null_docids_chunk: grenad::Reader>, + pub fid_facet_is_empty_docids_chunk: grenad::Reader>, + pub fid_facet_exists_docids_chunk: grenad::Reader>, } /// Extracts the facet values of each faceted field of each document. diff --git a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs index 79cf4c7fe..92564b4cd 100644 --- a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs +++ b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use grenad::Sorter; @@ -21,7 +21,7 @@ use crate::{relative_from_absolute_position, DocumentId, FieldId, Result}; pub fn extract_fid_word_count_docids( docid_word_positions: grenad::Reader, indexer: GrenadParameters, -) -> Result> { +) -> Result>> { puffin::profile_function!(); let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/index_documents/extract/extract_geo_points.rs b/milli/src/update/index_documents/extract/extract_geo_points.rs index 139e8230a..285a4bdba 100644 --- a/milli/src/update/index_documents/extract/extract_geo_points.rs +++ b/milli/src/update/index_documents/extract/extract_geo_points.rs @@ -1,5 +1,5 @@ use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use concat_arrays::concat_arrays; use serde_json::Value; @@ -18,7 +18,7 @@ pub fn extract_geo_points( indexer: GrenadParameters, primary_key_id: FieldId, (lat_fid, lng_fid): (FieldId, FieldId), -) -> Result> { +) -> Result>> { puffin::profile_function!(); let mut writer = create_writer( diff --git a/milli/src/update/index_documents/extract/extract_vector_points.rs b/milli/src/update/index_documents/extract/extract_vector_points.rs index dd5a4d3a7..863bc07c3 100644 --- a/milli/src/update/index_documents/extract/extract_vector_points.rs +++ b/milli/src/update/index_documents/extract/extract_vector_points.rs @@ -1,6 +1,6 @@ use std::convert::TryFrom; use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use bytemuck::cast_slice; use serde_json::{from_slice, Value}; @@ -18,7 +18,7 @@ pub fn extract_vector_points( indexer: GrenadParameters, primary_key_id: FieldId, vectors_fid: FieldId, -) -> Result> { +) -> Result>> { puffin::profile_function!(); let mut writer = create_writer( diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index f1656d024..f211f7023 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use std::iter::FromIterator; use roaring::RoaringBitmap; @@ -26,7 +26,7 @@ pub fn extract_word_docids( docid_word_positions: grenad::Reader, indexer: GrenadParameters, exact_attributes: &HashSet, -) -> Result<(grenad::Reader, grenad::Reader)> { +) -> Result<(grenad::Reader>, grenad::Reader>)> { puffin::profile_function!(); let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/index_documents/extract/extract_word_fid_docids.rs b/milli/src/update/index_documents/extract/extract_word_fid_docids.rs index aaf8fad79..09f571038 100644 --- a/milli/src/update/index_documents/extract/extract_word_fid_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_fid_docids.rs @@ -1,5 +1,5 @@ use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use super::helpers::{ create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader, @@ -14,7 +14,7 @@ use crate::{relative_from_absolute_position, DocumentId, Result}; pub fn extract_word_fid_docids( docid_word_positions: grenad::Reader, indexer: GrenadParameters, -) -> Result> { +) -> Result>> { puffin::profile_function!(); let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs index 4c910f32e..9ddd5ff4c 100644 --- a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs @@ -1,6 +1,7 @@ use std::cmp::Ordering; use std::collections::{BinaryHeap, HashMap}; use std::fs::File; +use std::io::BufReader; use std::{cmp, io, mem, str, vec}; use super::helpers::{ @@ -20,7 +21,7 @@ use crate::{DocumentId, Result}; pub fn extract_word_pair_proximity_docids( docid_word_positions: grenad::Reader, indexer: GrenadParameters, -) -> Result> { +) -> Result>> { puffin::profile_function!(); let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/index_documents/extract/extract_word_position_docids.rs b/milli/src/update/index_documents/extract/extract_word_position_docids.rs index e945833e6..94139ddf8 100644 --- a/milli/src/update/index_documents/extract/extract_word_position_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_position_docids.rs @@ -1,5 +1,5 @@ use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use super::helpers::{ create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader, @@ -17,7 +17,7 @@ use crate::{bucketed_position, relative_from_absolute_position, DocumentId, Resu pub fn extract_word_position_docids( docid_word_positions: grenad::Reader, indexer: GrenadParameters, -) -> Result> { +) -> Result>> { puffin::profile_function!(); let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index c3a023e71..f44eac8f5 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -12,6 +12,7 @@ mod extract_word_position_docids; use std::collections::HashSet; use std::fs::File; +use std::io::BufReader; use crossbeam_channel::Sender; use log::debug; @@ -39,8 +40,8 @@ use crate::{FieldId, Result}; /// Send data in grenad file over provided Sender. #[allow(clippy::too_many_arguments)] pub(crate) fn data_from_obkv_documents( - original_obkv_chunks: impl Iterator>> + Send, - flattened_obkv_chunks: impl Iterator>> + Send, + original_obkv_chunks: impl Iterator>>> + Send, + flattened_obkv_chunks: impl Iterator>>> + Send, indexer: GrenadParameters, lmdb_writer_sx: Sender>, searchable_fields: Option>, @@ -152,7 +153,7 @@ pub(crate) fn data_from_obkv_documents( }); } - spawn_extraction_task::<_, _, Vec>>( + spawn_extraction_task::<_, _, Vec>>>( docid_word_positions_chunks.clone(), indexer, lmdb_writer_sx.clone(), @@ -162,7 +163,7 @@ pub(crate) fn data_from_obkv_documents( "word-pair-proximity-docids", ); - spawn_extraction_task::<_, _, Vec>>( + spawn_extraction_task::<_, _, Vec>>>( docid_word_positions_chunks.clone(), indexer, lmdb_writer_sx.clone(), @@ -172,7 +173,11 @@ pub(crate) fn data_from_obkv_documents( "field-id-wordcount-docids", ); - spawn_extraction_task::<_, _, Vec<(grenad::Reader, grenad::Reader)>>( + spawn_extraction_task::< + _, + _, + Vec<(grenad::Reader>, grenad::Reader>)>, + >( docid_word_positions_chunks.clone(), indexer, lmdb_writer_sx.clone(), @@ -185,7 +190,7 @@ pub(crate) fn data_from_obkv_documents( "word-docids", ); - spawn_extraction_task::<_, _, Vec>>( + spawn_extraction_task::<_, _, Vec>>>( docid_word_positions_chunks.clone(), indexer, lmdb_writer_sx.clone(), @@ -194,7 +199,7 @@ pub(crate) fn data_from_obkv_documents( TypedChunk::WordPositionDocids, "word-position-docids", ); - spawn_extraction_task::<_, _, Vec>>( + spawn_extraction_task::<_, _, Vec>>>( docid_word_positions_chunks, indexer, lmdb_writer_sx.clone(), @@ -204,7 +209,7 @@ pub(crate) fn data_from_obkv_documents( "word-fid-docids", ); - spawn_extraction_task::<_, _, Vec>>( + spawn_extraction_task::<_, _, Vec>>>( docid_fid_facet_strings_chunks, indexer, lmdb_writer_sx.clone(), @@ -214,7 +219,7 @@ pub(crate) fn data_from_obkv_documents( "field-id-facet-string-docids", ); - spawn_extraction_task::<_, _, Vec>>( + spawn_extraction_task::<_, _, Vec>>>( docid_fid_facet_numbers_chunks, indexer, lmdb_writer_sx, @@ -269,7 +274,7 @@ fn spawn_extraction_task( /// Extract chunked data and send it into lmdb_writer_sx sender: /// - documents fn send_original_documents_data( - original_documents_chunk: Result>, + original_documents_chunk: Result>>, indexer: GrenadParameters, lmdb_writer_sx: Sender>, vectors_field_id: Option, @@ -311,7 +316,7 @@ fn send_original_documents_data( #[allow(clippy::too_many_arguments)] #[allow(clippy::type_complexity)] fn send_and_extract_flattened_documents_data( - flattened_documents_chunk: Result>, + flattened_documents_chunk: Result>>, indexer: GrenadParameters, lmdb_writer_sx: Sender>, searchable_fields: &Option>, @@ -328,7 +333,10 @@ fn send_and_extract_flattened_documents_data( grenad::Reader, ( grenad::Reader, - (grenad::Reader, (grenad::Reader, grenad::Reader)), + ( + grenad::Reader>, + (grenad::Reader>, grenad::Reader>), + ), ), ), )> { diff --git a/milli/src/update/index_documents/helpers/grenad_helpers.rs b/milli/src/update/index_documents/helpers/grenad_helpers.rs index d5f5ac0bd..f25fa9c53 100644 --- a/milli/src/update/index_documents/helpers/grenad_helpers.rs +++ b/milli/src/update/index_documents/helpers/grenad_helpers.rs @@ -1,6 +1,6 @@ use std::borrow::Cow; use std::fs::File; -use std::io::{self, Seek}; +use std::io::{self, BufReader, BufWriter, Seek}; use std::time::Instant; use grenad::{CompressionType, Sorter}; @@ -17,13 +17,13 @@ pub fn create_writer( typ: grenad::CompressionType, level: Option, file: R, -) -> grenad::Writer { +) -> grenad::Writer> { let mut builder = grenad::Writer::builder(); builder.compression_type(typ); if let Some(level) = level { builder.compression_level(level); } - builder.build(file) + builder.build(BufWriter::new(file)) } pub fn create_sorter( @@ -53,7 +53,7 @@ pub fn create_sorter( pub fn sorter_into_reader( sorter: grenad::Sorter, indexer: GrenadParameters, -) -> Result> { +) -> Result>> { let mut writer = create_writer( indexer.chunk_compression_type, indexer.chunk_compression_level, @@ -64,16 +64,21 @@ pub fn sorter_into_reader( writer_into_reader(writer) } -pub fn writer_into_reader(writer: grenad::Writer) -> Result> { - let mut file = writer.into_inner()?; +pub fn writer_into_reader( + writer: grenad::Writer>, +) -> Result>> { + let mut file = writer + .into_inner()? + .into_inner() + .map_err(|err| InternalError::BufIntoInnerError(err.to_string()))?; file.rewind()?; - grenad::Reader::new(file).map_err(Into::into) + grenad::Reader::new(BufReader::new(file)).map_err(Into::into) } pub unsafe fn as_cloneable_grenad( - reader: &grenad::Reader, + reader: &grenad::Reader>, ) -> Result> { - let file = reader.get_ref(); + let file = reader.get_ref().get_ref(); let mmap = memmap2::Mmap::map(file)?; let cursor = io::Cursor::new(ClonableMmap::from(mmap)); let reader = grenad::Reader::new(cursor)?; @@ -89,8 +94,8 @@ where fn merge(self, merge_fn: MergeFn, indexer: &GrenadParameters) -> Result; } -impl MergeableReader for Vec> { - type Output = grenad::Reader; +impl MergeableReader for Vec>> { + type Output = grenad::Reader>; fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result { let mut merger = MergerBuilder::new(merge_fn); @@ -99,8 +104,8 @@ impl MergeableReader for Vec> { } } -impl MergeableReader for Vec<(grenad::Reader, grenad::Reader)> { - type Output = (grenad::Reader, grenad::Reader); +impl MergeableReader for Vec<(grenad::Reader>, grenad::Reader>)> { + type Output = (grenad::Reader>, grenad::Reader>); fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result { let mut m1 = MergerBuilder::new(merge_fn); @@ -125,7 +130,7 @@ impl MergerBuilder { Ok(()) } - fn finish(self, params: &GrenadParameters) -> Result> { + fn finish(self, params: &GrenadParameters) -> Result>> { let merger = self.0.build(); let mut writer = create_writer( params.chunk_compression_type, @@ -176,7 +181,7 @@ pub fn grenad_obkv_into_chunks( reader: grenad::Reader, indexer: GrenadParameters, documents_chunk_size: usize, -) -> Result>>> { +) -> Result>>>> { let mut continue_reading = true; let mut cursor = reader.into_cursor()?; diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 7a0c811a8..450cc705e 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -659,8 +659,12 @@ impl<'a, 'i> Transform<'a, 'i> { new_documents_ids: self.new_documents_ids, replaced_documents_ids: self.replaced_documents_ids, documents_count: self.documents_count, - original_documents, - flattened_documents, + original_documents: original_documents + .into_inner() + .map_err(|err| InternalError::BufIntoInnerError(err.to_string()))?, + flattened_documents: flattened_documents + .into_inner() + .map_err(|err| InternalError::BufIntoInnerError(err.to_string()))?, }) } @@ -779,8 +783,12 @@ impl<'a, 'i> Transform<'a, 'i> { new_documents_ids: documents_ids, replaced_documents_ids: RoaringBitmap::default(), documents_count, - original_documents, - flattened_documents, + original_documents: original_documents + .into_inner() + .map_err(|err| InternalError::BufIntoInnerError(err.to_string()))?, + flattened_documents: flattened_documents + .into_inner() + .map_err(|err| InternalError::BufIntoInnerError(err.to_string()))?, }; let new_facets = output.compute_real_facets(wtxn, self.index)?; diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 788aaf93d..5895a69c5 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -2,7 +2,7 @@ use std::borrow::Cow; use std::collections::HashMap; use std::convert::TryInto; use std::fs::File; -use std::io; +use std::io::{self, BufReader}; use bytemuck::allocation::pod_collect_to_vec; use charabia::{Language, Script}; @@ -27,22 +27,22 @@ pub(crate) enum TypedChunk { FieldIdDocidFacetStrings(grenad::Reader), FieldIdDocidFacetNumbers(grenad::Reader), Documents(grenad::Reader), - FieldIdWordcountDocids(grenad::Reader), + FieldIdWordcountDocids(grenad::Reader>), NewDocumentsIds(RoaringBitmap), WordDocids { - word_docids_reader: grenad::Reader, - exact_word_docids_reader: grenad::Reader, + word_docids_reader: grenad::Reader>, + exact_word_docids_reader: grenad::Reader>, }, - WordPositionDocids(grenad::Reader), - WordFidDocids(grenad::Reader), - WordPairProximityDocids(grenad::Reader), - FieldIdFacetStringDocids(grenad::Reader), - FieldIdFacetNumberDocids(grenad::Reader), - FieldIdFacetExistsDocids(grenad::Reader), - FieldIdFacetIsNullDocids(grenad::Reader), - FieldIdFacetIsEmptyDocids(grenad::Reader), - GeoPoints(grenad::Reader), - VectorPoints(grenad::Reader), + WordPositionDocids(grenad::Reader>), + WordFidDocids(grenad::Reader>), + WordPairProximityDocids(grenad::Reader>), + FieldIdFacetStringDocids(grenad::Reader>), + FieldIdFacetNumberDocids(grenad::Reader>), + FieldIdFacetExistsDocids(grenad::Reader>), + FieldIdFacetIsNullDocids(grenad::Reader>), + FieldIdFacetIsEmptyDocids(grenad::Reader>), + GeoPoints(grenad::Reader>), + VectorPoints(grenad::Reader>), ScriptLanguageDocids(HashMap<(Script, Language), RoaringBitmap>), } diff --git a/milli/src/update/prefix_word_pairs/mod.rs b/milli/src/update/prefix_word_pairs/mod.rs index 3105b16e4..d70cad117 100644 --- a/milli/src/update/prefix_word_pairs/mod.rs +++ b/milli/src/update/prefix_word_pairs/mod.rs @@ -1,12 +1,12 @@ use std::borrow::Cow; use std::collections::HashSet; -use std::io::BufReader; +use std::io::{BufReader, BufWriter}; use grenad::CompressionType; use heed::types::ByteSlice; use super::index_documents::{merge_cbo_roaring_bitmaps, CursorClonableMmap}; -use crate::{Index, Result}; +use crate::{Index, InternalError, Result}; mod prefix_word; mod word_prefix; @@ -119,9 +119,12 @@ pub fn insert_into_database( pub fn write_into_lmdb_database_without_merging( wtxn: &mut heed::RwTxn, database: heed::PolyDatabase, - writer: grenad::Writer, + writer: grenad::Writer>, ) -> Result<()> { - let file = writer.into_inner()?; + let file = writer + .into_inner()? + .into_inner() + .map_err(|err| InternalError::BufIntoInnerError(err.to_string()))?; let reader = grenad::Reader::new(BufReader::new(file))?; if database.is_empty(wtxn)? { let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;