From 5cc81a01793d3d71d4f933fd123e6aed982ae661 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Mon, 1 Jun 2020 18:39:58 +0200 Subject: [PATCH] Merge many MTBL into one a the same time --- src/bin/indexer.rs | 114 +++++++++++++++++++-------------------------- 1 file changed, 48 insertions(+), 66 deletions(-) diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index eb5916091..270d24f9c 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -100,63 +100,57 @@ impl MtblKvStore { Ok(MtblKvStore(Some(out))) } - fn merge_with(self, other: MtblKvStore) -> anyhow::Result { - eprintln!("{:?}: Merging two MTBL stores...", rayon::current_thread_index()); + fn merge(key: &[u8], left: &[u8], right: &[u8]) -> Option> { + if key == b"\0words-fst" { + let left_fst = fst::Set::new(left).unwrap(); + let right_fst = fst::Set::new(right).unwrap(); - let (left, right) = match (self.0, other.0) { - (Some(left), Some(right)) => (left, right), - (Some(left), None) => return Ok(MtblKvStore(Some(left))), - (None, Some(right)) => return Ok(MtblKvStore(Some(right))), - (None, None) => return Ok(MtblKvStore(None)), - }; + // Union of the two FSTs + let op = fst::set::OpBuilder::new() + .add(left_fst.into_stream()) + .add(right_fst.into_stream()) + .r#union(); - let left = unsafe { memmap::Mmap::map(&left)? }; - let right = unsafe { memmap::Mmap::map(&right)? }; - - let left = Reader::new(&left, ReaderOptions::default()).unwrap(); - let right = Reader::new(&right, ReaderOptions::default()).unwrap(); - - fn merge(key: &[u8], left: &[u8], right: &[u8]) -> Option> { - if key == b"\0words-fst" { - let left_fst = fst::Set::new(left).unwrap(); - let right_fst = fst::Set::new(right).unwrap(); - - // Union of the two FSTs - let op = fst::set::OpBuilder::new() - .add(left_fst.into_stream()) - .add(right_fst.into_stream()) - .r#union(); - - let mut build = fst::SetBuilder::memory(); - build.extend_stream(op.into_stream()).unwrap(); - Some(build.into_inner().unwrap()) - } - else if key == b"\0headers" { - assert_eq!(left, right); - Some(left.to_vec()) - } - else if key.starts_with(&[1]) || key.starts_with(&[2]) { - let mut left = RoaringBitmap::deserialize_from(left).unwrap(); - let right = RoaringBitmap::deserialize_from(right).unwrap(); - left.union_with(&right); - let mut vec = Vec::new(); - left.serialize_into(&mut vec).unwrap(); - Some(vec) - } - else if key.starts_with(&[3]) { - assert_eq!(left, right); - Some(left.to_vec()) - } - else { - panic!("wut? {:?}", key) - } + let mut build = fst::SetBuilder::memory(); + build.extend_stream(op.into_stream()).unwrap(); + Some(build.into_inner().unwrap()) } + else if key == b"\0headers" { + assert_eq!(left, right); + Some(left.to_vec()) + } + else if key.starts_with(&[1]) || key.starts_with(&[2]) { + let mut left = RoaringBitmap::deserialize_from(left).unwrap(); + let right = RoaringBitmap::deserialize_from(right).unwrap(); + left.union_with(&right); + let mut vec = Vec::new(); + left.serialize_into(&mut vec).unwrap(); + Some(vec) + } + else if key.starts_with(&[3]) { + assert_eq!(left, right); + Some(left.to_vec()) + } + else { + panic!("wut? {:?}", key) + } + } + + fn from_many(stores: Vec) -> anyhow::Result { + eprintln!("{:?}: Merging {} MTBL stores...", rayon::current_thread_index(), stores.len()); + + let mmaps: Vec<_> = stores.iter().flat_map(|m| { + m.0.as_ref().map(|f| unsafe { memmap::Mmap::map(f).unwrap() }) + }).collect(); + + let sources = mmaps.iter().map(|mmap| { + Reader::new(&mmap, ReaderOptions::default()).unwrap() + }).collect(); let outfile = tempfile::tempfile()?; let mut out = Writer::new(outfile, None)?; - let sources = vec![left, right]; - let opt = MergerOptions { merge }; + let opt = MergerOptions { merge: MtblKvStore::merge }; let mut merger = Merger::new(sources, opt); let mut iter = merger.iter(); @@ -286,30 +280,18 @@ fn main() -> anyhow::Result<()> { let index = Index::new(&env)?; - let mut stores: Vec<_> = opt.files_to_index + let stores: Vec<_> = opt.files_to_index .into_par_iter() - .try_fold(MtblKvStore::default, |acc, path| { + .map(|path| { let rdr = csv::Reader::from_path(path)?; - let store = index_csv(rdr)?; - acc.merge_with(store) + index_csv(rdr) }) .inspect(|_| { eprintln!("Total number of documents seen so far is {}", ID_GENERATOR.load(Ordering::Relaxed)) }) .collect::>()?; - while stores.len() >= 1 { - let s = std::mem::take(&mut stores); - stores = s.into_par_iter().chunks(2).map(|mut v| { - match (v.pop(), v.pop()) { - (Some(a), Some(b)) => a.merge_with(b), - (Some(a), _) => Ok(a), - _ => unreachable!(), - } - }).collect::>()?; - } - - let mtbl_store = stores.pop().unwrap_or_default(); + let mtbl_store = MtblKvStore::from_many(stores)?; eprintln!("We are writing into LMDB..."); let mut wtxn = env.write_txn()?;