Merge other databases content while writing into LMDB at the same time

This commit is contained in:
Kerollmops 2020-10-05 15:33:45 +02:00
parent 9af946a306
commit bb15f16d8c
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -4,8 +4,9 @@ use std::fs::File;
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::iter::FromIterator; use std::iter::FromIterator;
use std::path::PathBuf; use std::path::PathBuf;
use std::{iter, thread}; use std::sync::mpsc::sync_channel;
use std::time::Instant; use std::time::Instant;
use std::{iter, thread};
use anyhow::{Context, bail}; use anyhow::{Context, bail};
use bstr::ByteSlice as _; use bstr::ByteSlice as _;
@ -778,7 +779,9 @@ fn main() -> anyhow::Result<()> {
documents_readers.push(readers.documents); documents_readers.push(readers.documents);
}); });
let merge_readers = |readers, merge| { // This is the function that merge the readers
// by using the given merge function.
let merge_readers = move |readers, merge| {
let mut writer = tempfile().map(|f| { let mut writer = tempfile().map(|f| {
create_writer(chunk_compression_type, chunk_compression_level, f) create_writer(chunk_compression_type, chunk_compression_level, f)
})?; })?;
@ -787,39 +790,69 @@ fn main() -> anyhow::Result<()> {
writer_into_reader(writer) writer_into_reader(writer)
}; };
debug!("Merging the main, word docids and words pairs proximity docids in parallel..."); // The enum and the channel which is used to transfert
let (main, (word_docids, words_pairs_proximities_docids)) = rayon::join(move || { // the readers merges potentially done on another thread.
merge_readers(main_readers, main_merge) enum DatabaseType { Main, WordDocids, WordsPairsProximitiesDocids };
}, || rayon::join(|| { let (sender, receiver) = sync_channel(3);
merge_readers(word_docids_readers, word_docids_merge) let main_sender = sender.clone();
}, || { let word_docids_sender = sender.clone();
merge_readers(words_pairs_proximities_docids_readers, words_pairs_proximities_docids_merge) let words_pairs_proximities_docids_sender = sender;
}));
let main = main?; debug!("Merging the main, word docids and words pairs proximity docids in parallel...");
let word_docids = word_docids?; rayon::spawn(move || {
let words_pairs_proximities_docids = words_pairs_proximities_docids?; let result = merge_readers(main_readers, main_merge);
main_sender.send((DatabaseType::Main, result)).unwrap();
});
rayon::spawn(move || {
let result = merge_readers(word_docids_readers, word_docids_merge);
word_docids_sender.send((DatabaseType::WordDocids, result)).unwrap();
});
rayon::spawn(move || {
let result = merge_readers(
words_pairs_proximities_docids_readers,
words_pairs_proximities_docids_merge,
);
let message = (DatabaseType::WordsPairsProximitiesDocids, result);
words_pairs_proximities_docids_sender.send(message).unwrap();
});
let mut wtxn = env.write_txn()?; let mut wtxn = env.write_txn()?;
debug!("Writing the main elements into LMDB on disk...");
write_into_lmdb_database(&mut wtxn, index.main, main)?;
debug!("Writing the words docids into LMDB on disk...");
let db = *index.word_docids.as_polymorph();
write_into_lmdb_database(&mut wtxn, db, word_docids)?;
debug!("Writing the docid word positions into LMDB on disk..."); debug!("Writing the docid word positions into LMDB on disk...");
let db = *index.docid_word_positions.as_polymorph(); merge_into_lmdb_database(
merge_into_lmdb_database(&mut wtxn, db, docid_word_positions_readers, docid_word_positions_merge)?; &mut wtxn,
*index.docid_word_positions.as_polymorph(),
debug!("Writing the words pairs proximities docids into LMDB on disk..."); docid_word_positions_readers,
let db = *index.word_pair_proximity_docids.as_polymorph(); docid_word_positions_merge,
write_into_lmdb_database(&mut wtxn, db, words_pairs_proximities_docids)?; )?;
debug!("Writing the documents into LMDB on disk..."); debug!("Writing the documents into LMDB on disk...");
let db = *index.documents.as_polymorph(); merge_into_lmdb_database(
merge_into_lmdb_database(&mut wtxn, db, documents_readers, documents_merge)?; &mut wtxn,
*index.documents.as_polymorph(),
documents_readers,
documents_merge,
)?;
for (db_type, result) in receiver {
let content = result?;
match db_type {
DatabaseType::Main => {
debug!("Writing the main elements into LMDB on disk...");
write_into_lmdb_database(&mut wtxn, index.main, content)?;
},
DatabaseType::WordDocids => {
debug!("Writing the words docids into LMDB on disk...");
let db = *index.word_docids.as_polymorph();
write_into_lmdb_database(&mut wtxn, db, content)?;
},
DatabaseType::WordsPairsProximitiesDocids => {
debug!("Writing the words pairs proximities docids into LMDB on disk...");
let db = *index.word_pair_proximity_docids.as_polymorph();
write_into_lmdb_database(&mut wtxn, db, content)?;
},
}
}
debug!("Retrieving the number of documents..."); debug!("Retrieving the number of documents...");
let count = index.number_of_documents(&wtxn)?; let count = index.number_of_documents(&wtxn)?;