From 4873abe14598c8fdf8328f74c082424a3a0373a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 5 Aug 2020 12:10:41 +0200 Subject: [PATCH] Introduce option flags to toggle the indexing engine --- src/bin/indexer.rs | 58 +++++++++++++++++++++++++++++++++------------- 1 file changed, 42 insertions(+), 16 deletions(-) diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index a03b1741a..64572f577 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -52,6 +52,14 @@ struct Opt { #[structopt(short, long)] jobs: Option, + /// MTBL max number of chunks in bytes. + #[structopt(long)] + max_nb_chunks: Option, + + /// MTBL max memory in bytes. + #[structopt(long)] + max_memory: Option, + /// Verbose mode (-v, -vv, -vvv, etc.) #[structopt(short, long, parse(from_occurrences))] verbose: usize, @@ -60,6 +68,10 @@ struct Opt { csv_file: Option, } +fn lmdb_key_valid_size(key: &[u8]) -> bool { + !key.is_empty() && key.len() <= LMDB_MAX_KEY_LENGTH +} + type MergeFn = fn(&[u8], &[Vec]) -> Result, ()>; struct Store { @@ -69,15 +81,23 @@ struct Store { } impl Store { - fn new() -> Store { - let sorter = Sorter::builder(merge as MergeFn) - .chunk_compression_type(CompressionType::Snappy) - .build(); + fn new(max_nb_chunks: Option, max_memory: Option) -> Store { + let mut builder = Sorter::builder(merge as MergeFn); + + builder.chunk_compression_type(CompressionType::Snappy); + + if let Some(nb_chunks) = max_nb_chunks { + builder.max_nb_chunks(nb_chunks); + } + + if let Some(memory) = max_memory { + builder.max_memory(memory); + } Store { word_positions: ArcCache::new(65_535), word_position_docids: ArcCache::new(65_535), - sorter, + sorter: builder.build(), } } @@ -126,7 +146,9 @@ impl Store { buffer.clear(); positions.serialize_into(&mut buffer)?; // that we write under the generated key into MTBL - sorter.insert(&key, &buffer)?; + if lmdb_key_valid_size(&key) { + sorter.insert(&key, &buffer)?; + } } Ok(()) @@ -149,7 +171,9 @@ impl Store { buffer.clear(); ids.serialize_into(&mut buffer)?; // that we write under the generated key into MTBL - sorter.insert(&key, &buffer)?; + if lmdb_key_valid_size(&key) { + sorter.insert(&key, &buffer)?; + } // And cleanup the position afterward key.truncate(key.len() - position_bytes.len()); } @@ -288,11 +312,13 @@ fn index_csv( mut rdr: csv::Reader, thread_index: usize, num_threads: usize, + max_nb_chunks: Option, + max_memory: Option, ) -> anyhow::Result> { debug!("{:?}: Indexing into an Indexed...", thread_index); - let mut store = Store::new(); + let mut store = Store::new(max_nb_chunks, max_memory); // Write the headers into a Vec of bytes and then into the store. let headers = rdr.headers()?; @@ -310,19 +336,16 @@ fn index_csv( if document_id % num_threads != thread_index { continue } let document_id = DocumentId::try_from(document_id).context("generated id is too big")?; - if document_id % (ONE_MILLION as u32) == 0 { debug!("We have seen {}m documents so far.", document_id / ONE_MILLION as u32); } for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { for (pos, word) in simple_alphanumeric_tokens(&content).enumerate().take(MAX_POSITION) { - if !word.is_empty() && word.len() < LMDB_MAX_KEY_LENGTH { - let word = word.cow_to_lowercase(); - let position = (attr * MAX_POSITION + pos) as u32; - store.insert_word_position(&word, position)?; - store.insert_word_position_docid(&word, position, document_id)?; - } + let word = word.cow_to_lowercase(); + let position = (attr * MAX_POSITION + pos) as u32; + store.insert_word_position(&word, position)?; + store.insert_word_position_docid(&word, position, document_id)?; } } @@ -407,6 +430,9 @@ fn main() -> anyhow::Result<()> { let num_threads = rayon::current_num_threads(); + let max_nb_chunks = opt.max_nb_chunks; + let max_memory = opt.max_memory; + // We duplicate the file # jobs times. let file = opt.csv_file.unwrap(); let csv_readers: Vec<_> = (0..num_threads).map(|_| csv::Reader::from_path(&file)).collect::>()?; @@ -414,7 +440,7 @@ fn main() -> anyhow::Result<()> { let stores: Vec<_> = csv_readers .into_par_iter() .enumerate() - .map(|(i, rdr)| index_csv(rdr, i, num_threads)) + .map(|(i, rdr)| index_csv(rdr, i, num_threads, max_nb_chunks, max_memory)) .collect::>()?; debug!("We are writing into LMDB...");