Move the documents MTBL database inside the Index

This commit is contained in:
Clément Renault 2020-08-10 13:47:19 +02:00
parent ecd2b2f217
commit 394844062f
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
5 changed files with 75 additions and 40 deletions

View File

@ -424,7 +424,7 @@ fn main() -> anyhow::Result<()> {
.max_dbs(10) .max_dbs(10)
.open(&opt.database)?; .open(&opt.database)?;
let index = Index::new(&env)?; let mut index = Index::new(&env, &opt.database)?;
let documents_path = opt.database.join("documents.mtbl"); let documents_path = opt.database.join("documents.mtbl");
let num_threads = rayon::current_num_threads(); let num_threads = rayon::current_num_threads();
@ -499,13 +499,12 @@ fn main() -> anyhow::Result<()> {
let mut builder = Merger::builder(docs_merge); let mut builder = Merger::builder(docs_merge);
builder.extend(docs_stores); builder.extend(docs_stores);
builder.build().write_into(&mut writer)?; builder.build().write_into(&mut writer)?;
Ok(writer.into_inner()?) as anyhow::Result<_> Ok(writer.finish()?) as anyhow::Result<_>
}); });
let file = lmdb.and(mtbl)?; lmdb.and(mtbl)?;
let mmap = unsafe { Mmap::map(&file)? }; index.refresh_documents()?;
let documents = Reader::new(mmap)?; let count = index.number_of_documents();
let count = documents.metadata().count_entries;
debug!("Wrote {} documents into LMDB", count); debug!("Wrote {} documents into LMDB", count);

View File

@ -1,4 +1,3 @@
use std::fs::File;
use std::io::{self, Write, BufRead}; use std::io::{self, Write, BufRead};
use std::iter::once; use std::iter::once;
use std::path::PathBuf; use std::path::PathBuf;
@ -7,7 +6,6 @@ use std::time::Instant;
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
use log::debug; use log::debug;
use milli::Index; use milli::Index;
use oxidized_mtbl::Reader;
use structopt::StructOpt; use structopt::StructOpt;
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
@ -47,14 +45,7 @@ fn main() -> anyhow::Result<()> {
.open(&opt.database)?; .open(&opt.database)?;
// Open the LMDB database. // Open the LMDB database.
let index = Index::new(&env)?; let index = Index::new(&env, opt.database)?;
// Open the documents MTBL database.
let path = opt.database.join("documents.mtbl");
let file = File::open(path)?;
let mmap = unsafe { memmap::Mmap::map(&file)? };
let documents = Reader::new(mmap.as_ref())?;
let rtxn = env.read_txn()?; let rtxn = env.read_txn()?;
let stdin = io::stdin(); let stdin = io::stdin();
@ -72,15 +63,13 @@ fn main() -> anyhow::Result<()> {
Some(headers) => headers, Some(headers) => headers,
None => return Ok(()), None => return Ok(()),
}; };
let documents = index.documents(documents_ids.iter().cloned())?;
let mut stdout = io::stdout(); let mut stdout = io::stdout();
stdout.write_all(&headers)?; stdout.write_all(&headers)?;
for id in &documents_ids { for (_id, content) in documents {
let id_bytes = id.to_be_bytes(); stdout.write_all(&content)?;
if let Some(content) = documents.clone().get(&id_bytes)? {
stdout.write_all(content.as_ref())?;
}
} }
debug!("Took {:.02?} to find {} documents", before.elapsed(), documents_ids.len()); debug!("Took {:.02?} to find {} documents", before.elapsed(), documents_ids.len());

View File

@ -9,7 +9,6 @@ use std::time::Instant;
use askama_warp::Template; use askama_warp::Template;
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
use oxidized_mtbl::Reader;
use serde::Deserialize; use serde::Deserialize;
use slice_group_by::StrGroupBy; use slice_group_by::StrGroupBy;
use structopt::StructOpt; use structopt::StructOpt;
@ -99,22 +98,13 @@ async fn main() -> anyhow::Result<()> {
.open(&opt.database)?; .open(&opt.database)?;
// Open the LMDB database. // Open the LMDB database.
let index = Index::new(&env)?; let index = Index::new(&env, &opt.database)?;
// Open the documents MTBL database.
let path = opt.database.join("documents.mtbl");
let file = File::open(path)?;
let mmap = unsafe { memmap::Mmap::map(&file)? };
let mmap = TransitiveArc(Arc::new(mmap));
let documents = Reader::new(mmap)?;
// Retrieve the database the file stem (w/o the extension), // Retrieve the database the file stem (w/o the extension),
// the disk file size and the number of documents in the database. // the disk file size and the number of documents in the database.
let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string(); let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string();
let db_size = File::open(opt.database.join("data.mdb"))?.metadata()?.len() as usize; let db_size = File::open(opt.database.join("data.mdb"))?.metadata()?.len() as usize;
let docs_count = index.number_of_documents();
// Retrieve the documents count.
let docs_count = documents.metadata().count_entries;
// We run and wait on the HTTP server // We run and wait on the HTTP server
@ -198,7 +188,6 @@ async fn main() -> anyhow::Result<()> {
} }
let env_cloned = env.clone(); let env_cloned = env.clone();
let documents_cloned = documents.clone();
let disable_highlighting = opt.disable_highlighting; let disable_highlighting = opt.disable_highlighting;
let query_route = warp::filters::method::post() let query_route = warp::filters::method::post()
.and(warp::path!("query")) .and(warp::path!("query"))
@ -213,13 +202,10 @@ async fn main() -> anyhow::Result<()> {
if let Some(headers) = index.headers(&rtxn).unwrap() { if let Some(headers) = index.headers(&rtxn).unwrap() {
// We write the headers // We write the headers
body.extend_from_slice(headers); body.extend_from_slice(headers);
let documents = index.documents(documents_ids).unwrap();
for id in documents_ids { for (_id, content) in documents {
let id_bytes = id.to_be_bytes();
let content = documents_cloned.clone().get(&id_bytes).unwrap();
let content = content.expect(&format!("could not find document {}", id));
let content = std::str::from_utf8(content.as_ref()).unwrap(); let content = std::str::from_utf8(content.as_ref()).unwrap();
let content = if disable_highlighting { let content = if disable_highlighting {
Cow::from(content) Cow::from(content)
} else { } else {

View File

@ -2,12 +2,17 @@ mod best_proximity;
mod heed_codec; mod heed_codec;
mod iter_shortest_paths; mod iter_shortest_paths;
mod query_tokens; mod query_tokens;
mod transitive_arc;
use std::borrow::Cow; use std::borrow::Cow;
use std::collections::{HashSet, HashMap}; use std::collections::{HashSet, HashMap};
use std::fs::{File, OpenOptions};
use std::hash::BuildHasherDefault; use std::hash::BuildHasherDefault;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use anyhow::Context;
use cow_utils::CowUtils; use cow_utils::CowUtils;
use fst::{IntoStreamer, Streamer}; use fst::{IntoStreamer, Streamer};
use fxhash::{FxHasher32, FxHasher64}; use fxhash::{FxHasher32, FxHasher64};
@ -15,12 +20,15 @@ use heed::types::*;
use heed::{PolyDatabase, Database}; use heed::{PolyDatabase, Database};
use levenshtein_automata::LevenshteinAutomatonBuilder as LevBuilder; use levenshtein_automata::LevenshteinAutomatonBuilder as LevBuilder;
use log::debug; use log::debug;
use memmap::Mmap;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use oxidized_mtbl as omtbl;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use self::best_proximity::BestProximity; use self::best_proximity::BestProximity;
use self::heed_codec::RoaringBitmapCodec; use self::heed_codec::RoaringBitmapCodec;
use self::query_tokens::{QueryTokens, QueryToken}; use self::query_tokens::{QueryTokens, QueryToken};
use self::transitive_arc::TransitiveArc;
// Building these factories is not free. // Building these factories is not free.
static LEVDIST0: Lazy<LevBuilder> = Lazy::new(|| LevBuilder::new(0, true)); static LEVDIST0: Lazy<LevBuilder> = Lazy::new(|| LevBuilder::new(0, true));
@ -39,6 +47,8 @@ pub type Position = u32;
#[derive(Clone)] #[derive(Clone)]
pub struct Index { pub struct Index {
// The database path, where the LMDB and MTBL files are.
path: PathBuf,
/// Contains many different types (e.g. the documents CSV headers). /// Contains many different types (e.g. the documents CSV headers).
pub main: PolyDatabase, pub main: PolyDatabase,
/// A word and all the positions where it appears in the whole dataset. /// A word and all the positions where it appears in the whole dataset.
@ -49,20 +59,40 @@ pub struct Index {
pub prefix_word_position_docids: Database<ByteSlice, RoaringBitmapCodec>, pub prefix_word_position_docids: Database<ByteSlice, RoaringBitmapCodec>,
/// Maps a word and an attribute (u32) to all the documents ids that it appears in. /// Maps a word and an attribute (u32) to all the documents ids that it appears in.
pub word_attribute_docids: Database<ByteSlice, RoaringBitmapCodec>, pub word_attribute_docids: Database<ByteSlice, RoaringBitmapCodec>,
/// The MTBL store that contains the documents content.
documents: omtbl::Reader<TransitiveArc<Mmap>>,
} }
impl Index { impl Index {
pub fn new(env: &heed::Env) -> heed::Result<Index> { pub fn new<P: AsRef<Path>>(env: &heed::Env, path: P) -> anyhow::Result<Index> {
let documents_path = path.as_ref().join("documents.mtbl");
let mut documents = OpenOptions::new().create(true).write(true).read(true).open(documents_path)?;
// If the file is empty we must initialize it like an empty MTBL database.
if documents.metadata()?.len() == 0 {
omtbl::Writer::new(&mut documents).finish()?;
}
let documents = unsafe { memmap::Mmap::map(&documents)? };
Ok(Index { Ok(Index {
path: path.as_ref().to_path_buf(),
main: env.create_poly_database(None)?, main: env.create_poly_database(None)?,
word_positions: env.create_database(Some("word-positions"))?, word_positions: env.create_database(Some("word-positions"))?,
prefix_word_positions: env.create_database(Some("prefix-word-positions"))?, prefix_word_positions: env.create_database(Some("prefix-word-positions"))?,
word_position_docids: env.create_database(Some("word-position-docids"))?, word_position_docids: env.create_database(Some("word-position-docids"))?,
prefix_word_position_docids: env.create_database(Some("prefix-word-position-docids"))?, prefix_word_position_docids: env.create_database(Some("prefix-word-position-docids"))?,
word_attribute_docids: env.create_database(Some("word-attribute-docids"))?, word_attribute_docids: env.create_database(Some("word-attribute-docids"))?,
documents: omtbl::Reader::new(TransitiveArc(Arc::new(documents)))?,
}) })
} }
pub fn refresh_documents(&mut self) -> anyhow::Result<()> {
let documents_path = self.path.join("documents.mtbl");
let documents = File::open(&documents_path)?;
let documents = unsafe { memmap::Mmap::map(&documents)? };
self.documents = omtbl::Reader::new(TransitiveArc(Arc::new(documents)))?;
Ok(())
}
pub fn put_headers(&self, wtxn: &mut heed::RwTxn, headers: &[u8]) -> anyhow::Result<()> { pub fn put_headers(&self, wtxn: &mut heed::RwTxn, headers: &[u8]) -> anyhow::Result<()> {
Ok(self.main.put::<_, Str, ByteSlice>(wtxn, "headers", headers)?) Ok(self.main.put::<_, Str, ByteSlice>(wtxn, "headers", headers)?)
} }
@ -93,6 +123,21 @@ impl Index {
} }
} }
/// Returns a [`Vec`] of the requested documents. Returns an error if a document is missing.
pub fn documents<I: IntoIterator<Item=DocumentId>>(&self, iter: I) -> anyhow::Result<Vec<(DocumentId, Vec<u8>)>> {
iter.into_iter().map(|id| {
let key = id.to_be_bytes();
let content = self.documents.clone().get(&key)?.with_context(|| format!("Could not find document {}.", id))?;
Ok((id, content.as_ref().to_vec()))
})
.collect()
}
/// Returns the number of documents indexed in the database.
pub fn number_of_documents(&self) -> usize {
self.documents.metadata().count_entries as usize
}
pub fn search(&self, rtxn: &heed::RoTxn, query: &str) -> anyhow::Result<(HashSet<String>, Vec<DocumentId>)> { pub fn search(&self, rtxn: &heed::RoTxn, query: &str) -> anyhow::Result<(HashSet<String>, Vec<DocumentId>)> {
let fst = match self.fst(rtxn)? { let fst = match self.fst(rtxn)? {
Some(fst) => fst, Some(fst) => fst,

16
src/transitive_arc.rs Normal file
View File

@ -0,0 +1,16 @@
use std::sync::Arc;
/// An `Arc<[u8]>` that is transitive over `AsRef<[u8]>`.
pub struct TransitiveArc<T>(pub Arc<T>);
impl<T: AsRef<[u8]>> AsRef<[u8]> for TransitiveArc<T> {
fn as_ref(&self) -> &[u8] {
self.0.as_ref().as_ref()
}
}
impl<T> Clone for TransitiveArc<T> {
fn clone(&self) -> TransitiveArc<T> {
TransitiveArc(self.0.clone())
}
}