Put the documents MTBL back into LMDB

We makes sure to write the documents into a file before
memory mapping it and putting it into LMDB, this way we avoid
moving it to RAM
This commit is contained in:
Clément Renault 2020-08-28 15:38:05 +02:00
parent d784d87880
commit 0a44ff86ab
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
10 changed files with 100 additions and 110 deletions

View File

@ -63,8 +63,5 @@ $('#docs-count').text(function(index, text) {
// Make the database a little bit easier to read
$('#db-size').text(function(index, text) {
let arr = text.split("+");
let database_size = filesize(parseInt(arr[0]));
let documents_size = filesize(parseInt(arr[1]));
return `${database_size} + ${documents_size}`
return filesize(parseInt(text))
});

View File

@ -1,5 +1,5 @@
use std::convert::{TryFrom, TryInto};
use std::fs::{File, OpenOptions};
use std::fs::File;
use std::io::{self, Read, Write};
use std::iter::FromIterator;
use std::path::PathBuf;
@ -14,7 +14,7 @@ use flate2::read::GzDecoder;
use fst::IntoStreamer;
use heed::EnvOpenOptions;
use heed::types::*;
use log::debug;
use log::{debug, info};
use memmap::Mmap;
use oxidized_mtbl::{Reader, Writer, Merger, Sorter, CompressionType};
use rayon::prelude::*;
@ -486,9 +486,9 @@ fn main() -> anyhow::Result<()> {
.max_dbs(10)
.open(&opt.database)?;
let mut index = Index::new(&env, &opt.database)?;
let before_indexing = Instant::now();
let index = Index::new(&env)?;
let documents_path = opt.database.join("documents.mtbl");
let num_threads = rayon::current_num_threads();
let arc_cache_size = opt.indexer.arc_cache_size;
let max_nb_chunks = opt.indexer.max_nb_chunks;
@ -566,32 +566,28 @@ fn main() -> anyhow::Result<()> {
docs_stores.push(d);
});
debug!("We are writing into LMDB and MTBL...");
debug!("We are writing the documents into MTBL on disk...");
// We also merge the documents into its own MTBL store.
let file = tempfile::tempfile()?;
let mut writer = Writer::builder()
.compression_type(documents_compression_type)
.compression_level(documents_compression_level)
.build(file);
let mut builder = Merger::builder(docs_merge);
builder.extend(docs_stores);
builder.build().write_into(&mut writer)?;
let file = writer.into_inner()?;
let documents_mmap = unsafe { memmap::Mmap::map(&file)? };
// We run both merging steps in parallel.
let (lmdb, mtbl) = rayon::join(|| {
// We merge the postings lists into LMDB.
let mut wtxn = env.write_txn()?;
merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?;
Ok(wtxn.commit()?) as anyhow::Result<_>
}, || {
// We also merge the documents into its own MTBL store.
let file = OpenOptions::new().create(true).truncate(true).write(true).read(true).open(documents_path)?;
let mut writer = Writer::builder()
.compression_type(documents_compression_type)
.compression_level(documents_compression_level)
.build(file);
let mut builder = Merger::builder(docs_merge);
builder.extend(docs_stores);
builder.build().write_into(&mut writer)?;
Ok(writer.finish()?) as anyhow::Result<_>
});
debug!("We are writing the postings lists and documents into LMDB on disk...");
// We merge the postings lists into LMDB.
let mut wtxn = env.write_txn()?;
merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?;
index.put_documents(&mut wtxn, &documents_mmap)?;
let count = index.number_of_documents(&wtxn)?;
wtxn.commit()?;
lmdb.and(mtbl)?;
index.refresh_documents()?;
let count = index.number_of_documents();
debug!("Wrote {} documents into LMDB", count);
info!("Wrote {} documents in {:.02?}", count, before_indexing.elapsed());
Ok(())
}

View File

@ -96,7 +96,7 @@ fn main() -> anyhow::Result<()> {
.open(&opt.database)?;
// Open the LMDB database.
let index = Index::new(&env, opt.database)?;
let index = Index::new(&env)?;
let rtxn = env.read_txn()?;
match opt.command {
@ -200,6 +200,11 @@ fn biggest_value_sizes(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyho
if heap.len() > limit { heap.pop(); }
}
if let Some(documents) = index.main.get::<_, ByteSlice, ByteSlice>(rtxn, b"documents")? {
heap.push(Reverse((documents.len(), format!("documents"), main_name)));
if heap.len() > limit { heap.pop(); }
}
for result in index.word_positions.as_polymorph().iter::<_, Str, ByteSlice>(rtxn)? {
let (word, value) = result?;
heap.push(Reverse((value.len(), word.to_string(), word_positions_name)));

View File

@ -49,7 +49,7 @@ fn main() -> anyhow::Result<()> {
.open(&opt.database)?;
// Open the LMDB database.
let index = Index::new(&env, opt.database)?;
let index = Index::new(&env)?;
let rtxn = env.read_txn()?;
let stdin = io::stdin();
@ -68,7 +68,7 @@ fn main() -> anyhow::Result<()> {
Some(headers) => headers,
None => return Ok(()),
};
let documents = index.documents(result.documents_ids.iter().cloned())?;
let documents = index.documents(&rtxn, result.documents_ids.iter().cloned())?;
let mut stdout = io::stdout();
stdout.write_all(&headers)?;

View File

@ -62,7 +62,6 @@ fn highlight_string(string: &str, words: &HashSet<String>) -> String {
struct IndexTemplate {
db_name: String,
db_size: usize,
docs_size: usize,
docs_count: usize,
}
@ -83,28 +82,23 @@ async fn main() -> anyhow::Result<()> {
.open(&opt.database)?;
// Open the LMDB database.
let index = Index::new(&env, &opt.database)?;
let index = Index::new(&env)?;
// Retrieve the database the file stem (w/o the extension),
// the disk file size and the number of documents in the database.
let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string();
let db_size = File::open(opt.database.join("data.mdb"))?.metadata()?.len() as usize;
let docs_size = File::open(opt.database.join("documents.mtbl"))?.metadata()?.len() as usize;
let docs_count = index.number_of_documents();
let rtxn = env.read_txn()?;
let docs_count = index.number_of_documents(&rtxn)? as usize;
drop(rtxn);
// We run and wait on the HTTP server
// Expose an HTML page to debug the search in a browser
let dash_html_route = warp::filters::method::get()
.and(warp::filters::path::end())
.map(move || {
IndexTemplate {
db_name: db_name.clone(),
db_size,
docs_size,
docs_count: docs_count as usize,
}
});
.map(move || IndexTemplate { db_name: db_name.clone(), db_size, docs_count });
let dash_bulma_route = warp::filters::method::get()
.and(warp::path!("bulma.min.css"))
@ -192,7 +186,7 @@ async fn main() -> anyhow::Result<()> {
if let Some(headers) = index.headers(&rtxn).unwrap() {
// We write the headers
body.extend_from_slice(headers);
let documents = index.documents(documents_ids).unwrap();
let documents = index.documents(&rtxn, documents_ids).unwrap();
for (_id, content) in documents {
let content = std::str::from_utf8(content.as_ref()).unwrap();

View File

@ -1,5 +1,7 @@
mod mtbl_codec;
mod roaring_bitmap_codec;
mod str_beu32_codec;
pub use self::mtbl_codec::MtblCodec;
pub use self::roaring_bitmap_codec::RoaringBitmapCodec;
pub use self::str_beu32_codec::StrBEU32Codec;

View File

@ -0,0 +1,20 @@
use std::borrow::Cow;
use oxidized_mtbl::Reader;
pub struct MtblCodec;
impl<'a> heed::BytesDecode<'a> for MtblCodec {
type DItem = Reader<&'a [u8]>;
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
Reader::new(bytes).ok()
}
}
impl heed::BytesEncode<'_> for MtblCodec {
type EItem = [u8];
fn bytes_encode(item: &Self::EItem) -> Option<Cow<[u8]>> {
Some(Cow::Borrowed(item))
}
}

View File

@ -2,27 +2,20 @@ mod criterion;
mod node;
mod query_tokens;
mod search;
mod transitive_arc;
pub mod heed_codec;
pub mod lexer;
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::hash::BuildHasherDefault;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::Context;
use anyhow::{bail, Context};
use fxhash::{FxHasher32, FxHasher64};
use heed::types::*;
use heed::{PolyDatabase, Database};
use memmap::Mmap;
use oxidized_mtbl as omtbl;
pub use self::search::{Search, SearchResult};
pub use self::criterion::{Criterion, default_criteria};
use self::heed_codec::{RoaringBitmapCodec, StrBEU32Codec};
use self::transitive_arc::TransitiveArc;
use self::heed_codec::{MtblCodec, RoaringBitmapCodec, StrBEU32Codec};
pub type FastMap4<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher32>>;
pub type FastMap8<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher64>>;
@ -34,10 +27,12 @@ pub type DocumentId = u32;
pub type Attribute = u32;
pub type Position = u32;
const WORDS_FST_KEY: &str = "words-fst";
const HEADERS_KEY: &str = "headers";
const DOCUMENTS_KEY: &str = "documents";
#[derive(Clone)]
pub struct Index {
// The database path, where the LMDB and MTBL files are.
path: PathBuf,
/// Contains many different types (e.g. the documents CSV headers).
pub main: PolyDatabase,
/// A word and all the positions where it appears in the whole dataset.
@ -46,44 +41,24 @@ pub struct Index {
pub word_position_docids: Database<StrBEU32Codec, RoaringBitmapCodec>,
/// Maps a word and an attribute (u32) to all the documents ids where the given word appears.
pub word_attribute_docids: Database<StrBEU32Codec, RoaringBitmapCodec>,
/// The MTBL store that contains the documents content.
documents: omtbl::Reader<TransitiveArc<Mmap>>,
}
impl Index {
pub fn new<P: AsRef<Path>>(env: &heed::Env, path: P) -> anyhow::Result<Index> {
let documents_path = path.as_ref().join("documents.mtbl");
let mut documents = OpenOptions::new().create(true).write(true).read(true).open(documents_path)?;
// If the file is empty we must initialize it like an empty MTBL database.
if documents.metadata()?.len() == 0 {
omtbl::Writer::new(&mut documents).finish()?;
}
let documents = unsafe { memmap::Mmap::map(&documents)? };
pub fn new(env: &heed::Env) -> anyhow::Result<Index> {
Ok(Index {
path: path.as_ref().to_path_buf(),
main: env.create_poly_database(None)?,
word_positions: env.create_database(Some("word-positions"))?,
word_position_docids: env.create_database(Some("word-position-docids"))?,
word_attribute_docids: env.create_database(Some("word-attribute-docids"))?,
documents: omtbl::Reader::new(TransitiveArc(Arc::new(documents)))?,
})
}
pub fn refresh_documents(&mut self) -> anyhow::Result<()> {
let documents_path = self.path.join("documents.mtbl");
let documents = File::open(&documents_path)?;
let documents = unsafe { memmap::Mmap::map(&documents)? };
self.documents = omtbl::Reader::new(TransitiveArc(Arc::new(documents)))?;
Ok(())
}
pub fn put_headers(&self, wtxn: &mut heed::RwTxn, headers: &[u8]) -> anyhow::Result<()> {
Ok(self.main.put::<_, Str, ByteSlice>(wtxn, "headers", headers)?)
Ok(self.main.put::<_, Str, ByteSlice>(wtxn, HEADERS_KEY, headers)?)
}
pub fn headers<'t>(&self, rtxn: &'t heed::RoTxn) -> heed::Result<Option<&'t [u8]>> {
self.main.get::<_, Str, ByteSlice>(rtxn, "headers")
self.main.get::<_, Str, ByteSlice>(rtxn, HEADERS_KEY)
}
pub fn number_of_attributes<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result<Option<usize>> {
@ -98,29 +73,46 @@ impl Index {
}
pub fn put_fst<A: AsRef<[u8]>>(&self, wtxn: &mut heed::RwTxn, fst: &fst::Set<A>) -> anyhow::Result<()> {
Ok(self.main.put::<_, Str, ByteSlice>(wtxn, "words-fst", fst.as_fst().as_bytes())?)
Ok(self.main.put::<_, Str, ByteSlice>(wtxn, WORDS_FST_KEY, fst.as_fst().as_bytes())?)
}
pub fn fst<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result<Option<fst::Set<&'t [u8]>>> {
match self.main.get::<_, Str, ByteSlice>(rtxn, "words-fst")? {
match self.main.get::<_, Str, ByteSlice>(rtxn, WORDS_FST_KEY)? {
Some(bytes) => Ok(Some(fst::Set::new(bytes)?)),
None => Ok(None),
}
}
/// Returns a [`Vec`] of the requested documents. Returns an error if a document is missing.
pub fn documents<I: IntoIterator<Item=DocumentId>>(&self, iter: I) -> anyhow::Result<Vec<(DocumentId, Vec<u8>)>> {
iter.into_iter().map(|id| {
let key = id.to_be_bytes();
let content = self.documents.clone().get(&key)?.with_context(|| format!("Could not find document {}.", id))?;
Ok((id, content.as_ref().to_vec()))
})
.collect()
pub fn documents<'t>(
&self,
rtxn: &'t heed::RoTxn,
iter: impl IntoIterator<Item=DocumentId>,
) -> anyhow::Result<Vec<(DocumentId, Vec<u8>)>>
{
match self.main.get::<_, Str, MtblCodec>(rtxn, DOCUMENTS_KEY)? {
Some(documents) => {
iter.into_iter().map(|id| {
let key = id.to_be_bytes();
let content = documents.clone().get(&key)?
.with_context(|| format!("Could not find document {}", id))?;
Ok((id, content.as_ref().to_vec()))
}).collect()
},
None => bail!("No documents database found"),
}
}
pub fn put_documents(&self, wtxn: &mut heed::RwTxn, documents: &[u8]) -> anyhow::Result<()> {
Ok(self.main.put::<_, Str, MtblCodec>(wtxn, DOCUMENTS_KEY, documents)?)
}
/// Returns the number of documents indexed in the database.
pub fn number_of_documents(&self) -> usize {
self.documents.metadata().count_entries as usize
pub fn number_of_documents<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result<usize> {
match self.main.get::<_, Str, MtblCodec>(rtxn, DOCUMENTS_KEY)? {
Some(documents) => Ok(documents.metadata().count_entries as usize),
None => return Ok(0),
}
}
pub fn search<'a>(&'a self, rtxn: &'a heed::RoTxn) -> Search<'a> {

View File

@ -1,16 +0,0 @@
use std::sync::Arc;
/// An `Arc<[u8]>` that is transitive over `AsRef<[u8]>`.
pub struct TransitiveArc<T>(pub Arc<T>);
impl<T: AsRef<[u8]>> AsRef<[u8]> for TransitiveArc<T> {
fn as_ref(&self) -> &[u8] {
self.0.as_ref().as_ref()
}
}
impl<T> Clone for TransitiveArc<T> {
fn clone(&self) -> TransitiveArc<T> {
TransitiveArc(self.0.clone())
}
}

View File

@ -34,7 +34,7 @@
<div class="level-item has-text-centered">
<div>
<p class="heading">Database Size</p>
<p class="title" id="db-size">{{ db_size }} + {{ docs_size }}</p>
<p class="title" id="db-size">{{ db_size }}</p>
</div>
</div>
<div class="level-item has-text-centered">