Merge many MTBL into one a the same time

This commit is contained in:
Kerollmops 2020-06-01 18:39:58 +02:00
parent 6a047519f6
commit 5cc81a0179
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -100,63 +100,57 @@ impl MtblKvStore {
Ok(MtblKvStore(Some(out))) Ok(MtblKvStore(Some(out)))
} }
fn merge_with(self, other: MtblKvStore) -> anyhow::Result<MtblKvStore> { fn merge(key: &[u8], left: &[u8], right: &[u8]) -> Option<Vec<u8>> {
eprintln!("{:?}: Merging two MTBL stores...", rayon::current_thread_index()); if key == b"\0words-fst" {
let left_fst = fst::Set::new(left).unwrap();
let right_fst = fst::Set::new(right).unwrap();
let (left, right) = match (self.0, other.0) { // Union of the two FSTs
(Some(left), Some(right)) => (left, right), let op = fst::set::OpBuilder::new()
(Some(left), None) => return Ok(MtblKvStore(Some(left))), .add(left_fst.into_stream())
(None, Some(right)) => return Ok(MtblKvStore(Some(right))), .add(right_fst.into_stream())
(None, None) => return Ok(MtblKvStore(None)), .r#union();
};
let left = unsafe { memmap::Mmap::map(&left)? }; let mut build = fst::SetBuilder::memory();
let right = unsafe { memmap::Mmap::map(&right)? }; build.extend_stream(op.into_stream()).unwrap();
Some(build.into_inner().unwrap())
let left = Reader::new(&left, ReaderOptions::default()).unwrap();
let right = Reader::new(&right, ReaderOptions::default()).unwrap();
fn merge(key: &[u8], left: &[u8], right: &[u8]) -> Option<Vec<u8>> {
if key == b"\0words-fst" {
let left_fst = fst::Set::new(left).unwrap();
let right_fst = fst::Set::new(right).unwrap();
// Union of the two FSTs
let op = fst::set::OpBuilder::new()
.add(left_fst.into_stream())
.add(right_fst.into_stream())
.r#union();
let mut build = fst::SetBuilder::memory();
build.extend_stream(op.into_stream()).unwrap();
Some(build.into_inner().unwrap())
}
else if key == b"\0headers" {
assert_eq!(left, right);
Some(left.to_vec())
}
else if key.starts_with(&[1]) || key.starts_with(&[2]) {
let mut left = RoaringBitmap::deserialize_from(left).unwrap();
let right = RoaringBitmap::deserialize_from(right).unwrap();
left.union_with(&right);
let mut vec = Vec::new();
left.serialize_into(&mut vec).unwrap();
Some(vec)
}
else if key.starts_with(&[3]) {
assert_eq!(left, right);
Some(left.to_vec())
}
else {
panic!("wut? {:?}", key)
}
} }
else if key == b"\0headers" {
assert_eq!(left, right);
Some(left.to_vec())
}
else if key.starts_with(&[1]) || key.starts_with(&[2]) {
let mut left = RoaringBitmap::deserialize_from(left).unwrap();
let right = RoaringBitmap::deserialize_from(right).unwrap();
left.union_with(&right);
let mut vec = Vec::new();
left.serialize_into(&mut vec).unwrap();
Some(vec)
}
else if key.starts_with(&[3]) {
assert_eq!(left, right);
Some(left.to_vec())
}
else {
panic!("wut? {:?}", key)
}
}
fn from_many(stores: Vec<MtblKvStore>) -> anyhow::Result<MtblKvStore> {
eprintln!("{:?}: Merging {} MTBL stores...", rayon::current_thread_index(), stores.len());
let mmaps: Vec<_> = stores.iter().flat_map(|m| {
m.0.as_ref().map(|f| unsafe { memmap::Mmap::map(f).unwrap() })
}).collect();
let sources = mmaps.iter().map(|mmap| {
Reader::new(&mmap, ReaderOptions::default()).unwrap()
}).collect();
let outfile = tempfile::tempfile()?; let outfile = tempfile::tempfile()?;
let mut out = Writer::new(outfile, None)?; let mut out = Writer::new(outfile, None)?;
let sources = vec![left, right]; let opt = MergerOptions { merge: MtblKvStore::merge };
let opt = MergerOptions { merge };
let mut merger = Merger::new(sources, opt); let mut merger = Merger::new(sources, opt);
let mut iter = merger.iter(); let mut iter = merger.iter();
@ -286,30 +280,18 @@ fn main() -> anyhow::Result<()> {
let index = Index::new(&env)?; let index = Index::new(&env)?;
let mut stores: Vec<_> = opt.files_to_index let stores: Vec<_> = opt.files_to_index
.into_par_iter() .into_par_iter()
.try_fold(MtblKvStore::default, |acc, path| { .map(|path| {
let rdr = csv::Reader::from_path(path)?; let rdr = csv::Reader::from_path(path)?;
let store = index_csv(rdr)?; index_csv(rdr)
acc.merge_with(store)
}) })
.inspect(|_| { .inspect(|_| {
eprintln!("Total number of documents seen so far is {}", ID_GENERATOR.load(Ordering::Relaxed)) eprintln!("Total number of documents seen so far is {}", ID_GENERATOR.load(Ordering::Relaxed))
}) })
.collect::<Result<_, _>>()?; .collect::<Result<_, _>>()?;
while stores.len() >= 1 { let mtbl_store = MtblKvStore::from_many(stores)?;
let s = std::mem::take(&mut stores);
stores = s.into_par_iter().chunks(2).map(|mut v| {
match (v.pop(), v.pop()) {
(Some(a), Some(b)) => a.merge_with(b),
(Some(a), _) => Ok(a),
_ => unreachable!(),
}
}).collect::<Result<_, _>>()?;
}
let mtbl_store = stores.pop().unwrap_or_default();
eprintln!("We are writing into LMDB..."); eprintln!("We are writing into LMDB...");
let mut wtxn = env.write_txn()?; let mut wtxn = env.write_txn()?;