diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index 9a452114a..81bd5e9fa 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -616,6 +616,12 @@ fn documents_merge(key: &[u8], _values: &[Vec]) -> Result, ()> { panic!("merging documents is an error ({:?})", key.as_bstr()) } +fn merge_readers(sources: Vec>, merge: MergeFn) -> Merger { + let mut builder = Merger::builder(merge); + builder.extend(sources); + builder.build() +} + fn merge_into_lmdb_database( wtxn: &mut heed::RwTxn, database: heed::PolyDatabase, @@ -625,11 +631,28 @@ fn merge_into_lmdb_database( debug!("Merging {} MTBL stores...", sources.len()); let before = Instant::now(); - let mut builder = Merger::builder(merge); - builder.extend(sources); - let merger = builder.build(); - + let merger = merge_readers(sources, merge); let mut in_iter = merger.into_merge_iter()?; + + let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; + while let Some(result) = in_iter.next() { + let (k, v) = result?; + out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; + } + + debug!("MTBL stores merged in {:.02?}!", before.elapsed()); + Ok(()) +} + +fn write_into_lmdb_database( + wtxn: &mut heed::RwTxn, + database: heed::PolyDatabase, + reader: Reader, +) -> anyhow::Result<()> { + debug!("Writing MTBL stores..."); + let before = Instant::now(); + + let mut in_iter = reader.into_iter()?; let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; while let Some(result) = in_iter.next() { let (k, v) = result?; @@ -742,39 +765,61 @@ fn main() -> anyhow::Result<()> { }) .collect::, _>>()?; - let mut main_stores = Vec::with_capacity(readers.len()); - let mut word_docids_stores = Vec::with_capacity(readers.len()); - let mut docid_word_positions_stores = Vec::with_capacity(readers.len()); - let mut words_pairs_proximities_docids_stores = Vec::with_capacity(readers.len()); - let mut documents_stores = Vec::with_capacity(readers.len()); + let mut main_readers = Vec::with_capacity(readers.len()); + let mut word_docids_readers = Vec::with_capacity(readers.len()); + let mut docid_word_positions_readers = Vec::with_capacity(readers.len()); + let mut words_pairs_proximities_docids_readers = Vec::with_capacity(readers.len()); + let mut documents_readers = Vec::with_capacity(readers.len()); readers.into_iter().for_each(|readers| { - main_stores.push(readers.main); - word_docids_stores.push(readers.word_docids); - docid_word_positions_stores.push(readers.docid_word_positions); - words_pairs_proximities_docids_stores.push(readers.words_pairs_proximities_docids); - documents_stores.push(readers.documents); + main_readers.push(readers.main); + word_docids_readers.push(readers.word_docids); + docid_word_positions_readers.push(readers.docid_word_positions); + words_pairs_proximities_docids_readers.push(readers.words_pairs_proximities_docids); + documents_readers.push(readers.documents); }); + let merge_readers = |readers, merge| { + let mut writer = tempfile().map(|f| { + create_writer(chunk_compression_type, chunk_compression_level, f) + })?; + let merger = merge_readers(readers, merge); + merger.write_into(&mut writer)?; + writer_into_reader(writer) + }; + + debug!("Merging the main, word docids and words pairs proximity docids in parallel..."); + let (main, (word_docids, words_pairs_proximities_docids)) = rayon::join(move || { + merge_readers(main_readers, main_merge) + }, || rayon::join(|| { + merge_readers(word_docids_readers, word_docids_merge) + }, || { + merge_readers(words_pairs_proximities_docids_readers, words_pairs_proximities_docids_merge) + })); + + let main = main?; + let word_docids = word_docids?; + let words_pairs_proximities_docids = words_pairs_proximities_docids?; + let mut wtxn = env.write_txn()?; debug!("Writing the main elements into LMDB on disk..."); - merge_into_lmdb_database(&mut wtxn, index.main, main_stores, main_merge)?; + write_into_lmdb_database(&mut wtxn, index.main, main)?; debug!("Writing the words docids into LMDB on disk..."); let db = *index.word_docids.as_polymorph(); - merge_into_lmdb_database(&mut wtxn, db, word_docids_stores, word_docids_merge)?; + write_into_lmdb_database(&mut wtxn, db, word_docids)?; debug!("Writing the docid word positions into LMDB on disk..."); let db = *index.docid_word_positions.as_polymorph(); - merge_into_lmdb_database(&mut wtxn, db, docid_word_positions_stores, docid_word_positions_merge)?; + merge_into_lmdb_database(&mut wtxn, db, docid_word_positions_readers, docid_word_positions_merge)?; debug!("Writing the words pairs proximities docids into LMDB on disk..."); let db = *index.word_pair_proximity_docids.as_polymorph(); - merge_into_lmdb_database(&mut wtxn, db, words_pairs_proximities_docids_stores, words_pairs_proximities_docids_merge)?; + write_into_lmdb_database(&mut wtxn, db, words_pairs_proximities_docids)?; debug!("Writing the documents into LMDB on disk..."); let db = *index.documents.as_polymorph(); - merge_into_lmdb_database(&mut wtxn, db, documents_stores, documents_merge)?; + merge_into_lmdb_database(&mut wtxn, db, documents_readers, documents_merge)?; debug!("Retrieving the number of documents..."); let count = index.number_of_documents(&wtxn)?;