diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index 81bd5e9fa..f1acfbf15 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -4,8 +4,9 @@ use std::fs::File; use std::io::{self, Read, Write}; use std::iter::FromIterator; use std::path::PathBuf; -use std::{iter, thread}; +use std::sync::mpsc::sync_channel; use std::time::Instant; +use std::{iter, thread}; use anyhow::{Context, bail}; use bstr::ByteSlice as _; @@ -778,7 +779,9 @@ fn main() -> anyhow::Result<()> { documents_readers.push(readers.documents); }); - let merge_readers = |readers, merge| { + // This is the function that merge the readers + // by using the given merge function. + let merge_readers = move |readers, merge| { let mut writer = tempfile().map(|f| { create_writer(chunk_compression_type, chunk_compression_level, f) })?; @@ -787,39 +790,69 @@ fn main() -> anyhow::Result<()> { writer_into_reader(writer) }; - debug!("Merging the main, word docids and words pairs proximity docids in parallel..."); - let (main, (word_docids, words_pairs_proximities_docids)) = rayon::join(move || { - merge_readers(main_readers, main_merge) - }, || rayon::join(|| { - merge_readers(word_docids_readers, word_docids_merge) - }, || { - merge_readers(words_pairs_proximities_docids_readers, words_pairs_proximities_docids_merge) - })); + // The enum and the channel which is used to transfert + // the readers merges potentially done on another thread. + enum DatabaseType { Main, WordDocids, WordsPairsProximitiesDocids }; + let (sender, receiver) = sync_channel(3); + let main_sender = sender.clone(); + let word_docids_sender = sender.clone(); + let words_pairs_proximities_docids_sender = sender; - let main = main?; - let word_docids = word_docids?; - let words_pairs_proximities_docids = words_pairs_proximities_docids?; + debug!("Merging the main, word docids and words pairs proximity docids in parallel..."); + rayon::spawn(move || { + let result = merge_readers(main_readers, main_merge); + main_sender.send((DatabaseType::Main, result)).unwrap(); + }); + rayon::spawn(move || { + let result = merge_readers(word_docids_readers, word_docids_merge); + word_docids_sender.send((DatabaseType::WordDocids, result)).unwrap(); + }); + rayon::spawn(move || { + let result = merge_readers( + words_pairs_proximities_docids_readers, + words_pairs_proximities_docids_merge, + ); + let message = (DatabaseType::WordsPairsProximitiesDocids, result); + words_pairs_proximities_docids_sender.send(message).unwrap(); + }); let mut wtxn = env.write_txn()?; - debug!("Writing the main elements into LMDB on disk..."); - write_into_lmdb_database(&mut wtxn, index.main, main)?; - - debug!("Writing the words docids into LMDB on disk..."); - let db = *index.word_docids.as_polymorph(); - write_into_lmdb_database(&mut wtxn, db, word_docids)?; - debug!("Writing the docid word positions into LMDB on disk..."); - let db = *index.docid_word_positions.as_polymorph(); - merge_into_lmdb_database(&mut wtxn, db, docid_word_positions_readers, docid_word_positions_merge)?; - - debug!("Writing the words pairs proximities docids into LMDB on disk..."); - let db = *index.word_pair_proximity_docids.as_polymorph(); - write_into_lmdb_database(&mut wtxn, db, words_pairs_proximities_docids)?; + merge_into_lmdb_database( + &mut wtxn, + *index.docid_word_positions.as_polymorph(), + docid_word_positions_readers, + docid_word_positions_merge, + )?; debug!("Writing the documents into LMDB on disk..."); - let db = *index.documents.as_polymorph(); - merge_into_lmdb_database(&mut wtxn, db, documents_readers, documents_merge)?; + merge_into_lmdb_database( + &mut wtxn, + *index.documents.as_polymorph(), + documents_readers, + documents_merge, + )?; + + for (db_type, result) in receiver { + let content = result?; + match db_type { + DatabaseType::Main => { + debug!("Writing the main elements into LMDB on disk..."); + write_into_lmdb_database(&mut wtxn, index.main, content)?; + }, + DatabaseType::WordDocids => { + debug!("Writing the words docids into LMDB on disk..."); + let db = *index.word_docids.as_polymorph(); + write_into_lmdb_database(&mut wtxn, db, content)?; + }, + DatabaseType::WordsPairsProximitiesDocids => { + debug!("Writing the words pairs proximities docids into LMDB on disk..."); + let db = *index.word_pair_proximity_docids.as_polymorph(); + write_into_lmdb_database(&mut wtxn, db, content)?; + }, + } + } debug!("Retrieving the number of documents..."); let count = index.number_of_documents(&wtxn)?;