diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index ef873c966..ebe29f885 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -1,6 +1,6 @@ use std::convert::TryInto; use std::convert::TryFrom; -use std::fs::File; +use std::fs::{File, OpenOptions}; use std::io::{self, Read, Write}; use std::iter::FromIterator; use std::path::PathBuf; @@ -31,7 +31,6 @@ const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; const HEADERS_KEY: &[u8] = b"\0headers"; const WORDS_FST_KEY: &[u8] = b"\x05words-fst"; -const DOCUMENTS_KEY: &[u8] = b"\x06documents"; const WORD_POSITIONS_BYTE: u8 = 1; const WORD_POSITION_DOCIDS_BYTE: u8 = 2; const WORD_ATTRIBUTE_DOCIDS_BYTE: u8 = 3; @@ -220,7 +219,7 @@ impl Store { Ok(()) } - pub fn finish(mut self) -> anyhow::Result> { + pub fn finish(mut self) -> anyhow::Result<(Reader, Reader)> { Self::write_word_positions(&mut self.sorter, self.word_positions)?; Self::write_word_position_docids(&mut self.sorter, self.word_position_docids)?; Self::write_word_attribute_docids(&mut self.sorter, self.word_attribute_docids)?; @@ -246,13 +245,13 @@ impl Store { self.documents_sorter.write_into(&mut docs_wtr)?; let docs_file = docs_wtr.into_inner()?; let docs_mmap = unsafe { Mmap::map(&docs_file)? }; - wtr.insert(DOCUMENTS_KEY, docs_mmap)?; + let docs_reader = Reader::new(docs_mmap)?; let file = wtr.into_inner()?; let mmap = unsafe { Mmap::map(&file)? }; let reader = Reader::new(mmap)?; - Ok(reader) + Ok((reader, docs_reader)) } } @@ -280,20 +279,6 @@ fn merge(key: &[u8], values: &[Vec]) -> Result, ()> { assert!(values.windows(2).all(|vs| vs[0] == vs[1])); Ok(values[0].to_vec()) }, - DOCUMENTS_KEY => { - let sources: Vec<_> = values.iter().map(Reader::new).collect::>().unwrap(); - - let mut builder = Merger::builder(docs_merge); - builder.extend(sources); - let merger = builder.build(); - - let mut builder = Writer::builder(); - builder.compression_type(CompressionType::Snappy); - - let mut wtr = builder.memory(); - merger.write_into(&mut wtr).unwrap(); - Ok(wtr.into_inner().unwrap()) - }, key => match key[0] { WORD_POSITIONS_BYTE | WORD_POSITION_DOCIDS_BYTE | WORD_ATTRIBUTE_DOCIDS_BYTE => { let mut first = RoaringBitmap::deserialize_from(values[0].as_slice()).unwrap(); @@ -323,10 +308,6 @@ fn lmdb_writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> // Write the headers index.main.put::<_, Str, ByteSlice>(wtxn, "headers", val)?; } - else if key == DOCUMENTS_KEY { - // Write the documents - index.main.put::<_, Str, ByteSlice>(wtxn, "documents", val)?; - } else if key.starts_with(&[WORD_POSITIONS_BYTE]) { // Write the postings lists index.word_positions.as_polymorph() @@ -373,7 +354,7 @@ fn index_csv( arc_cache_size: Option, max_nb_chunks: Option, max_memory: Option, -) -> anyhow::Result> +) -> anyhow::Result<(Reader, Reader)> { debug!("{:?}: Indexing into a Store...", thread_index); @@ -418,9 +399,9 @@ fn index_csv( store.write_document(document_id, &document)?; } - let reader = store.finish()?; + let (reader, docs_reader) = store.finish()?; debug!("{:?}: Store created!", thread_index); - Ok(reader) + Ok((reader, docs_reader)) } fn main() -> anyhow::Result<()> { @@ -441,7 +422,7 @@ fn main() -> anyhow::Result<()> { .map_size(100 * 1024 * 1024 * 1024) // 100 GB .max_readers(10) .max_dbs(10) - .open(opt.database)?; + .open(&opt.database)?; let index = Index::new(&env)?; @@ -488,17 +469,37 @@ fn main() -> anyhow::Result<()> { }, }; - let stores = csv_readers + let readers = csv_readers .into_par_iter() .enumerate() .map(|(i, rdr)| index_csv(rdr, i, num_threads, arc_cache_size, max_nb_chunks, max_memory)) - .collect::>()?; + .collect::, _>>()?; + + let mut stores = Vec::with_capacity(readers.len()); + let mut docs_stores = Vec::with_capacity(readers.len()); + + readers.into_iter().for_each(|(s, d)| { + stores.push(s); + docs_stores.push(d); + }); debug!("We are writing into LMDB..."); let mut wtxn = env.write_txn()?; + // We merge the postings lists into LMDB. merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?; - let count = index.documents(&wtxn)?.unwrap().metadata().count_entries; + + // We also merge the documents into its own MTBL store. + let path = opt.database.join("documents.mtbl"); + let file = OpenOptions::new().create(true).truncate(true).write(true).read(true).open(path)?; + let mut writer = Writer::builder().compression_type(CompressionType::Snappy).build(file); + let mut builder = Merger::builder(docs_merge); + builder.extend(docs_stores); + builder.build().write_into(&mut writer)?; + let file = writer.into_inner()?; + let mmap = unsafe { Mmap::map(&file)? }; + let documents = Reader::new(&mmap)?; + let count = documents.metadata().count_entries; wtxn.commit()?; debug!("Wrote {} documents into LMDB", count); diff --git a/src/bin/search.rs b/src/bin/search.rs index 832013d2a..6bd29230f 100644 --- a/src/bin/search.rs +++ b/src/bin/search.rs @@ -1,3 +1,4 @@ +use std::fs::File; use std::io::{self, Write, BufRead}; use std::iter::once; use std::path::PathBuf; @@ -6,6 +7,7 @@ use std::time::Instant; use heed::EnvOpenOptions; use log::debug; use milli::Index; +use oxidized_mtbl::Reader; use structopt::StructOpt; #[cfg(target_os = "linux")] @@ -42,10 +44,17 @@ fn main() -> anyhow::Result<()> { .map_size(100 * 1024 * 1024 * 1024) // 100 GB .max_readers(10) .max_dbs(10) - .open(opt.database)?; + .open(&opt.database)?; + // Open the LMDB database. let index = Index::new(&env)?; + // Open the documents MTBL database. + let path = opt.database.join("documents.mtbl"); + let file = File::open(path)?; + let mmap = unsafe { memmap::Mmap::map(&file)? }; + let documents = Reader::new(mmap.as_ref())?; + let rtxn = env.read_txn()?; let stdin = io::stdin(); @@ -67,7 +76,6 @@ fn main() -> anyhow::Result<()> { let mut stdout = io::stdout(); stdout.write_all(&headers)?; - let documents = index.documents(&rtxn)?.unwrap(); for id in &documents_ids { let id_bytes = id.to_be_bytes(); if let Some(content) = documents.clone().get(&id_bytes)? { diff --git a/src/bin/serve.rs b/src/bin/serve.rs index 9554a6c2f..5c2f9bf5d 100644 --- a/src/bin/serve.rs +++ b/src/bin/serve.rs @@ -4,10 +4,12 @@ use std::fs::File; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; +use std::sync::Arc; use std::time::Instant; use askama_warp::Template; use heed::EnvOpenOptions; +use oxidized_mtbl::Reader; use serde::Deserialize; use slice_group_by::StrGroupBy; use structopt::StructOpt; @@ -57,6 +59,21 @@ fn highlight_string(string: &str, words: &HashSet) -> String { output } +// TODO find a better way or move this elsewhere +struct TransitiveArc(Arc); + +impl> AsRef<[u8]> for TransitiveArc { + fn as_ref(&self) -> &[u8] { + self.0.as_ref().as_ref() + } +} + +impl Clone for TransitiveArc { + fn clone(&self) -> TransitiveArc { + TransitiveArc(self.0.clone()) + } +} + #[derive(Template)] #[template(path = "index.html")] struct IndexTemplate { @@ -81,13 +98,23 @@ async fn main() -> anyhow::Result<()> { .max_dbs(10) .open(&opt.database)?; + // Open the LMDB database. let index = Index::new(&env)?; + // Open the documents MTBL database. + let path = opt.database.join("documents.mtbl"); + let file = File::open(path)?; + let mmap = unsafe { memmap::Mmap::map(&file)? }; + let mmap = TransitiveArc(Arc::new(mmap)); + let documents = Reader::new(mmap)?; + // Retrieve the database the file stem (w/o the extension), // the disk file size and the number of documents in the database. let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string(); let db_size = File::open(opt.database.join("data.mdb"))?.metadata()?.len() as usize; - let docs_count = env.read_txn().and_then(|r| Ok(index.documents(&r).unwrap().unwrap().metadata().count_entries))?; + + // Retrieve the documents count. + let docs_count = documents.metadata().count_entries; // We run and wait on the HTTP server @@ -171,6 +198,7 @@ async fn main() -> anyhow::Result<()> { } let env_cloned = env.clone(); + let documents_cloned = documents.clone(); let disable_highlighting = opt.disable_highlighting; let query_route = warp::filters::method::post() .and(warp::path!("query")) @@ -185,11 +213,10 @@ async fn main() -> anyhow::Result<()> { if let Some(headers) = index.headers(&rtxn).unwrap() { // We write the headers body.extend_from_slice(headers); - let documents = index.documents(&rtxn).unwrap().unwrap(); for id in documents_ids { let id_bytes = id.to_be_bytes(); - let content = documents.clone().get(&id_bytes).unwrap(); + let content = documents_cloned.clone().get(&id_bytes).unwrap(); let content = content.expect(&format!("could not find document {}", id)); let content = std::str::from_utf8(content.as_ref()).unwrap(); diff --git a/src/lib.rs b/src/lib.rs index 40a8b9a03..fc7a4b7c9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,6 @@ use heed::{PolyDatabase, Database}; use levenshtein_automata::LevenshteinAutomatonBuilder as LevBuilder; use log::debug; use once_cell::sync::Lazy; -use oxidized_mtbl::Reader; use roaring::RoaringBitmap; use self::best_proximity::BestProximity; @@ -72,13 +71,6 @@ impl Index { self.main.get::<_, Str, ByteSlice>(rtxn, "headers") } - pub fn documents<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result>> { - match self.main.get::<_, Str, ByteSlice>(rtxn, "documents")? { - Some(bytes) => Ok(Some(Reader::new(bytes)?)), - None => Ok(None), - } - } - pub fn number_of_attributes<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result> { match self.headers(rtxn)? { Some(headers) => {