Make the final merge done in parallel

This commit is contained in:
Clément Renault 2020-08-07 15:44:04 +02:00
parent 91282c8b6a
commit ecd2b2f217
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -426,6 +426,7 @@ fn main() -> anyhow::Result<()> {
let index = Index::new(&env)?; let index = Index::new(&env)?;
let documents_path = opt.database.join("documents.mtbl");
let num_threads = rayon::current_num_threads(); let num_threads = rayon::current_num_threads();
let arc_cache_size = opt.arc_cache_size; let arc_cache_size = opt.arc_cache_size;
let max_nb_chunks = opt.max_nb_chunks; let max_nb_chunks = opt.max_nb_chunks;
@ -483,25 +484,29 @@ fn main() -> anyhow::Result<()> {
docs_stores.push(d); docs_stores.push(d);
}); });
debug!("We are writing into LMDB..."); debug!("We are writing into LMDB and MTBL...");
let mut wtxn = env.write_txn()?;
// We run both merging steps in parallel.
let (lmdb, mtbl) = rayon::join(|| {
// We merge the postings lists into LMDB. // We merge the postings lists into LMDB.
let mut wtxn = env.write_txn()?;
merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?; merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?;
Ok(wtxn.commit()?) as anyhow::Result<_>
}, || {
// We also merge the documents into its own MTBL store. // We also merge the documents into its own MTBL store.
let path = opt.database.join("documents.mtbl"); let file = OpenOptions::new().create(true).truncate(true).write(true).read(true).open(documents_path)?;
let file = OpenOptions::new().create(true).truncate(true).write(true).read(true).open(path)?;
let mut writer = Writer::builder().compression_type(CompressionType::Snappy).build(file); let mut writer = Writer::builder().compression_type(CompressionType::Snappy).build(file);
let mut builder = Merger::builder(docs_merge); let mut builder = Merger::builder(docs_merge);
builder.extend(docs_stores); builder.extend(docs_stores);
builder.build().write_into(&mut writer)?; builder.build().write_into(&mut writer)?;
let file = writer.into_inner()?; Ok(writer.into_inner()?) as anyhow::Result<_>
});
let file = lmdb.and(mtbl)?;
let mmap = unsafe { Mmap::map(&file)? }; let mmap = unsafe { Mmap::map(&file)? };
let documents = Reader::new(&mmap)?; let documents = Reader::new(mmap)?;
let count = documents.metadata().count_entries; let count = documents.metadata().count_entries;
wtxn.commit()?;
debug!("Wrote {} documents into LMDB", count); debug!("Wrote {} documents into LMDB", count);
Ok(()) Ok(())