Reintroduce appending sorted entries when possible

This commit is contained in:
Clément Renault 2022-02-22 17:28:57 +01:00 committed by Kerollmops
parent 382be56d36
commit 04b1bbf932
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -3,7 +3,7 @@ use std::fs::File;
use std::io::{self, Seek, SeekFrom};
use std::time::Instant;
use grenad::{CompressionType, MergerIter, Reader, Sorter};
use grenad::{CompressionType, Reader, Sorter};
use heed::types::ByteSlice;
use log::debug;
@ -209,36 +209,34 @@ pub fn sorter_into_lmdb_database(
debug!("Writing MTBL sorter...");
let before = Instant::now();
merger_iter_into_lmdb_database(wtxn, database, sorter.into_stream_merger_iter()?, merge)?;
debug!("MTBL sorter writen in {:.02?}!", before.elapsed());
Ok(())
}
fn merger_iter_into_lmdb_database<R: io::Read + io::Seek>(
wtxn: &mut heed::RwTxn,
database: heed::PolyDatabase,
mut merger_iter: MergerIter<R, MergeFn>,
merge: MergeFn,
) -> Result<()> {
while let Some((k, v)) = merger_iter.next()? {
let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?;
match iter.next().transpose()? {
Some((key, old_val)) if key == k => {
let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)];
let val = merge(k, &vals).map_err(|_| {
// TODO just wrap this error?
InternalError::IndexingMergingKeys { process: "get-put-merge" }
})?;
// safety: we don't keep references from inside the LMDB database.
unsafe { iter.put_current(k, &val)? };
}
_ => {
drop(iter);
database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?;
let mut merger_iter = sorter.into_stream_merger_iter()?;
if database.is_empty(wtxn)? {
let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;
while let Some((k, v)) = merger_iter.next()? {
// safety: we don't keep references from inside the LMDB database.
unsafe { out_iter.append(k, v)? };
}
} else {
while let Some((k, v)) = merger_iter.next()? {
let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?;
match iter.next().transpose()? {
Some((key, old_val)) if key == k => {
let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)];
let val = merge(k, &vals).map_err(|_| {
// TODO just wrap this error?
InternalError::IndexingMergingKeys { process: "get-put-merge" }
})?;
// safety: we don't keep references from inside the LMDB database.
unsafe { iter.put_current(k, &val)? };
}
_ => {
drop(iter);
database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?;
}
}
}
}
debug!("MTBL sorter writen in {:.02?}!", before.elapsed());
Ok(())
}