From ada30c2789f1ac63c4050641202648c9ffc6e08c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Fri, 21 Aug 2020 16:41:26 +0200 Subject: [PATCH] Introducing more arguments to specify the different compression algorithms --- src/bin/indexer.rs | 108 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 92 insertions(+), 16 deletions(-) diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index f9c5960ee..73c1ef334 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -40,7 +40,8 @@ const WORD_ATTRIBUTE_DOCIDS_BYTE: u8 = 3; static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; #[derive(Debug, StructOpt)] -#[structopt(name = "milli-indexer", about = "The indexer binary of the milli project.")] +#[structopt(name = "milli-indexer")] +/// The indexer binary of the milli project. struct Opt { /// The database path where the database is located. /// It is created if it doesn't already exist. @@ -56,6 +57,28 @@ struct Opt { #[structopt(short, long)] jobs: Option, + #[structopt(flatten)] + indexer: IndexerOpt, + + /// The name of the compression algorithm to use when compressing the final documents database. + #[structopt(long, default_value = "zlib", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] + documents_compression_type: String, + + /// The level of compression of the chosen algorithm. + #[structopt(long, default_value = "9")] + documents_compression_level: u32, + + /// Verbose mode (-v, -vv, -vvv, etc.) + #[structopt(short, long, parse(from_occurrences))] + verbose: usize, + + /// CSV file to index, if unspecified the CSV is read from standard input. + /// Note that it is much faster to index from a file. + csv_file: Option, +} + +#[derive(Debug, StructOpt)] +struct IndexerOpt { /// MTBL max number of chunks in bytes. #[structopt(long)] max_nb_chunks: Option, @@ -68,13 +91,27 @@ struct Opt { #[structopt(long)] arc_cache_size: Option, - /// Verbose mode (-v, -vv, -vvv, etc.) - #[structopt(short, long, parse(from_occurrences))] - verbose: usize, + /// The name of the compression algorithm to use when compressing intermediate + /// chunks during indexing documents. + /// + /// Choosing a fast algorithm will make the indexing faster but may consume more memory. + #[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] + chunk_compression_type: String, - /// CSV file to index, if unspecified the CSV is read from standard input. - /// Note that it is much faster to index from a file. - csv_file: Option, + /// The level of compression of the chosen algorithm. + #[structopt(long, requires = "chunk-compression-type")] + chunk_compression_level: Option, +} + +fn compression_type_from_str(name: &str) -> CompressionType { + match name { + "snappy" => CompressionType::Snappy, + "zlib" => CompressionType::Zlib, + "lz4" => CompressionType::Lz4, + "lz4hc" => CompressionType::Lz4hc, + "zstd" => CompressionType::Zstd, + _ => panic!("invalid compression algorithm"), + } } fn lmdb_key_valid_size(key: &[u8]) -> bool { @@ -92,9 +129,19 @@ struct Store { } impl Store { - fn new(arc_cache_size: Option, max_nb_chunks: Option, max_memory: Option) -> Store { + fn new( + arc_cache_size: Option, + max_nb_chunks: Option, + max_memory: Option, + chunk_compression_type: CompressionType, + chunk_compression_level: Option, + ) -> Store + { let mut builder = Sorter::builder(merge as MergeFn); - builder.chunk_compression_type(CompressionType::Snappy); + builder.chunk_compression_type(chunk_compression_type); + if let Some(level) = chunk_compression_level { + builder.chunk_compression_level(level); + } if let Some(nb_chunks) = max_nb_chunks { builder.max_nb_chunks(nb_chunks); } @@ -103,7 +150,10 @@ impl Store { } let mut documents_builder = Sorter::builder(docs_merge as MergeFn); - documents_builder.chunk_compression_type(CompressionType::Snappy); + documents_builder.chunk_compression_type(chunk_compression_type); + if let Some(level) = chunk_compression_level { + builder.chunk_compression_level(level); + } let arc_cache_size = arc_cache_size.unwrap_or(65_535); @@ -354,11 +404,19 @@ fn index_csv( arc_cache_size: Option, max_nb_chunks: Option, max_memory: Option, + chunk_compression_type: CompressionType, + chunk_compression_level: Option, ) -> anyhow::Result<(Reader, Reader)> { debug!("{:?}: Indexing into a Store...", thread_index); - let mut store = Store::new(arc_cache_size, max_nb_chunks, max_memory); + let mut store = Store::new( + arc_cache_size, + max_nb_chunks, + max_memory, + chunk_compression_type, + chunk_compression_level, + ); // Write the headers into a Vec of bytes and then into the store. let headers = rdr.headers()?; @@ -427,9 +485,13 @@ fn main() -> anyhow::Result<()> { let documents_path = opt.database.join("documents.mtbl"); let num_threads = rayon::current_num_threads(); - let arc_cache_size = opt.arc_cache_size; - let max_nb_chunks = opt.max_nb_chunks; - let max_memory = opt.max_memory; + let arc_cache_size = opt.indexer.arc_cache_size; + let max_nb_chunks = opt.indexer.max_nb_chunks; + let max_memory = opt.indexer.max_memory; + let chunk_compression_type = compression_type_from_str(&opt.indexer.chunk_compression_type); + let chunk_compression_level = opt.indexer.chunk_compression_level; + let documents_compression_type = compression_type_from_str(&opt.documents_compression_type); + let documents_compression_level = opt.documents_compression_level; let csv_readers = match opt.csv_file { Some(file_path) => { @@ -472,7 +534,18 @@ fn main() -> anyhow::Result<()> { let readers = csv_readers .into_par_iter() .enumerate() - .map(|(i, rdr)| index_csv(rdr, i, num_threads, arc_cache_size, max_nb_chunks, max_memory)) + .map(|(i, rdr)| { + index_csv( + rdr, + i, + num_threads, + arc_cache_size, + max_nb_chunks, + max_memory, + chunk_compression_type, + chunk_compression_level, + ) + }) .collect::, _>>()?; let mut stores = Vec::with_capacity(readers.len()); @@ -494,7 +567,10 @@ fn main() -> anyhow::Result<()> { }, || { // We also merge the documents into its own MTBL store. let file = OpenOptions::new().create(true).truncate(true).write(true).read(true).open(documents_path)?; - let mut writer = Writer::builder().compression_type(CompressionType::Zlib).compression_level(9).build(file); + let mut writer = Writer::builder() + .compression_type(documents_compression_type) + .compression_level(documents_compression_level) + .build(file); let mut builder = Merger::builder(docs_merge); builder.extend(docs_stores); builder.build().write_into(&mut writer)?;