diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index d53173b71..b6fde7ef4 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -8,7 +8,7 @@ use std::time::Instant; use anyhow::Context; use bstr::ByteSlice as _; -use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; +use grenad::{MergerIter, Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; use heed::types::ByteSlice; use log::{debug, info, error}; use memmap::Mmap; @@ -102,39 +102,19 @@ pub fn merge_into_lmdb_database( sources: Vec>, merge: MergeFn, method: WriteMethod, -) -> anyhow::Result<()> { +) -> anyhow::Result<()> +{ debug!("Merging {} MTBL stores...", sources.len()); let before = Instant::now(); let merger = merge_readers(sources, merge); - let mut in_iter = merger.into_merge_iter()?; - - match method { - WriteMethod::Append => { - let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; - while let Some((k, v)) = in_iter.next()? { - out_iter.append(k, v).with_context(|| { - format!("writing {:?} into LMDB", k.as_bstr()) - })?; - } - }, - WriteMethod::GetMergePut => { - while let Some((k, v)) = in_iter.next()? { - let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; - match iter.next().transpose()? { - Some((key, old_val)) if key == k => { - let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; - let val = merge(k, &vals).expect("merge failed"); - iter.put_current(k, &val)?; - }, - _ => { - drop(iter); - database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?; - }, - } - } - }, - } + merger_iter_into_lmdb_database( + wtxn, + database, + merger.into_merge_iter()?, + merge, + method, + )?; debug!("MTBL stores merged in {:.02?}!", before.elapsed()); Ok(()) @@ -146,7 +126,8 @@ pub fn write_into_lmdb_database( mut reader: Reader, merge: MergeFn, method: WriteMethod, -) -> anyhow::Result<()> { +) -> anyhow::Result<()> +{ debug!("Writing MTBL stores..."); let before = Instant::now(); @@ -181,6 +162,67 @@ pub fn write_into_lmdb_database( Ok(()) } +pub fn sorter_into_lmdb_database( + wtxn: &mut heed::RwTxn, + database: heed::PolyDatabase, + sorter: Sorter, + merge: MergeFn, + method: WriteMethod, +) -> anyhow::Result<()> +{ + debug!("Writing MTBL sorter..."); + let before = Instant::now(); + + merger_iter_into_lmdb_database( + wtxn, + database, + sorter.into_iter()?, + merge, + method, + )?; + + debug!("MTBL sorter writen in {:.02?}!", before.elapsed()); + Ok(()) +} + +fn merger_iter_into_lmdb_database( + wtxn: &mut heed::RwTxn, + database: heed::PolyDatabase, + mut sorter: MergerIter, + merge: MergeFn, + method: WriteMethod, +) -> anyhow::Result<()> +{ + match method { + WriteMethod::Append => { + let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; + while let Some((k, v)) = sorter.next()? { + out_iter.append(k, v).with_context(|| { + format!("writing {:?} into LMDB", k.as_bstr()) + })?; + } + }, + WriteMethod::GetMergePut => { + while let Some((k, v)) = sorter.next()? { + let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; + match iter.next().transpose()? { + Some((key, old_val)) if key == k => { + let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; + let val = merge(k, &vals).expect("merge failed"); + iter.put_current(k, &val)?; + }, + _ => { + drop(iter); + database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?; + }, + } + } + }, + } + + Ok(()) +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] #[non_exhaustive] pub enum IndexDocumentsMethod { diff --git a/milli/src/update/words_prefixes.rs b/milli/src/update/words_prefixes.rs index b020ed28b..f7c898c89 100644 --- a/milli/src/update/words_prefixes.rs +++ b/milli/src/update/words_prefixes.rs @@ -9,7 +9,7 @@ use heed::types::ByteSlice; use crate::heed_codec::StrStrU8Codec; use crate::update::index_documents::WriteMethod; -use crate::update::index_documents::{create_sorter, create_writer, writer_into_reader, write_into_lmdb_database}; +use crate::update::index_documents::{create_sorter, sorter_into_lmdb_database}; use crate::update::index_documents::{word_docids_merge, words_pairs_proximities_docids_merge}; use crate::{Index, SmallString32}; @@ -144,21 +144,11 @@ impl<'t, 'u, 'i> WordsPrefixes<'t, 'u, 'i> { // Set the words prefixes FST in the dtabase. self.index.put_words_prefixes_fst(self.wtxn, &prefix_fst)?; - // We write the sorter into a reader to be able to read it back. - let mut prefix_docids_writer = tempfile::tempfile().and_then(|file| { - create_writer(self.chunk_compression_type, self.chunk_compression_level, file) - })?; - prefix_docids_sorter.write_into(&mut prefix_docids_writer)?; - let prefix_docids_reader = writer_into_reader( - prefix_docids_writer, - self.chunk_fusing_shrink_size, - )?; - // We finally write the word prefix docids into the LMDB database. - write_into_lmdb_database( + sorter_into_lmdb_database( self.wtxn, *self.index.word_prefix_docids.as_polymorph(), - prefix_docids_reader, + prefix_docids_sorter, word_docids_merge, WriteMethod::Append, )?; @@ -190,22 +180,11 @@ impl<'t, 'u, 'i> WordsPrefixes<'t, 'u, 'i> { } } - // FIXME we should create a sorter_into_lmdb_database function - // We write the sorter into a reader to be able to read it back. - let mut word_prefix_pair_prox_docids_writer = tempfile::tempfile().and_then(|file| { - create_writer(self.chunk_compression_type, self.chunk_compression_level, file) - })?; - word_prefix_pair_proximity_docids_sorter.write_into(&mut word_prefix_pair_prox_docids_writer)?; - let word_prefix_pair_docids_reader = writer_into_reader( - word_prefix_pair_prox_docids_writer, - self.chunk_fusing_shrink_size, - )?; - // We finally write the word prefix pair proximity docids into the LMDB database. - write_into_lmdb_database( + sorter_into_lmdb_database( self.wtxn, *self.index.word_prefix_pair_proximity_docids.as_polymorph(), - word_prefix_pair_docids_reader, + word_prefix_pair_proximity_docids_sorter, words_pairs_proximities_docids_merge, WriteMethod::Append, )?;