Introducing more arguments to specify the different compression algorithms

This commit is contained in:
Clément Renault 2020-08-21 16:41:26 +02:00
parent 02335ee72d
commit ada30c2789
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -40,7 +40,8 @@ const WORD_ATTRIBUTE_DOCIDS_BYTE: u8 = 3;
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
#[derive(Debug, StructOpt)] #[derive(Debug, StructOpt)]
#[structopt(name = "milli-indexer", about = "The indexer binary of the milli project.")] #[structopt(name = "milli-indexer")]
/// The indexer binary of the milli project.
struct Opt { struct Opt {
/// The database path where the database is located. /// The database path where the database is located.
/// It is created if it doesn't already exist. /// It is created if it doesn't already exist.
@ -56,6 +57,28 @@ struct Opt {
#[structopt(short, long)] #[structopt(short, long)]
jobs: Option<usize>, jobs: Option<usize>,
#[structopt(flatten)]
indexer: IndexerOpt,
/// The name of the compression algorithm to use when compressing the final documents database.
#[structopt(long, default_value = "zlib", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])]
documents_compression_type: String,
/// The level of compression of the chosen algorithm.
#[structopt(long, default_value = "9")]
documents_compression_level: u32,
/// Verbose mode (-v, -vv, -vvv, etc.)
#[structopt(short, long, parse(from_occurrences))]
verbose: usize,
/// CSV file to index, if unspecified the CSV is read from standard input.
/// Note that it is much faster to index from a file.
csv_file: Option<PathBuf>,
}
#[derive(Debug, StructOpt)]
struct IndexerOpt {
/// MTBL max number of chunks in bytes. /// MTBL max number of chunks in bytes.
#[structopt(long)] #[structopt(long)]
max_nb_chunks: Option<usize>, max_nb_chunks: Option<usize>,
@ -68,13 +91,27 @@ struct Opt {
#[structopt(long)] #[structopt(long)]
arc_cache_size: Option<usize>, arc_cache_size: Option<usize>,
/// Verbose mode (-v, -vv, -vvv, etc.) /// The name of the compression algorithm to use when compressing intermediate
#[structopt(short, long, parse(from_occurrences))] /// chunks during indexing documents.
verbose: usize, ///
/// Choosing a fast algorithm will make the indexing faster but may consume more memory.
#[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])]
chunk_compression_type: String,
/// CSV file to index, if unspecified the CSV is read from standard input. /// The level of compression of the chosen algorithm.
/// Note that it is much faster to index from a file. #[structopt(long, requires = "chunk-compression-type")]
csv_file: Option<PathBuf>, chunk_compression_level: Option<u32>,
}
fn compression_type_from_str(name: &str) -> CompressionType {
match name {
"snappy" => CompressionType::Snappy,
"zlib" => CompressionType::Zlib,
"lz4" => CompressionType::Lz4,
"lz4hc" => CompressionType::Lz4hc,
"zstd" => CompressionType::Zstd,
_ => panic!("invalid compression algorithm"),
}
} }
fn lmdb_key_valid_size(key: &[u8]) -> bool { fn lmdb_key_valid_size(key: &[u8]) -> bool {
@ -92,9 +129,19 @@ struct Store {
} }
impl Store { impl Store {
fn new(arc_cache_size: Option<usize>, max_nb_chunks: Option<usize>, max_memory: Option<usize>) -> Store { fn new(
arc_cache_size: Option<usize>,
max_nb_chunks: Option<usize>,
max_memory: Option<usize>,
chunk_compression_type: CompressionType,
chunk_compression_level: Option<u32>,
) -> Store
{
let mut builder = Sorter::builder(merge as MergeFn); let mut builder = Sorter::builder(merge as MergeFn);
builder.chunk_compression_type(CompressionType::Snappy); builder.chunk_compression_type(chunk_compression_type);
if let Some(level) = chunk_compression_level {
builder.chunk_compression_level(level);
}
if let Some(nb_chunks) = max_nb_chunks { if let Some(nb_chunks) = max_nb_chunks {
builder.max_nb_chunks(nb_chunks); builder.max_nb_chunks(nb_chunks);
} }
@ -103,7 +150,10 @@ impl Store {
} }
let mut documents_builder = Sorter::builder(docs_merge as MergeFn); let mut documents_builder = Sorter::builder(docs_merge as MergeFn);
documents_builder.chunk_compression_type(CompressionType::Snappy); documents_builder.chunk_compression_type(chunk_compression_type);
if let Some(level) = chunk_compression_level {
builder.chunk_compression_level(level);
}
let arc_cache_size = arc_cache_size.unwrap_or(65_535); let arc_cache_size = arc_cache_size.unwrap_or(65_535);
@ -354,11 +404,19 @@ fn index_csv(
arc_cache_size: Option<usize>, arc_cache_size: Option<usize>,
max_nb_chunks: Option<usize>, max_nb_chunks: Option<usize>,
max_memory: Option<usize>, max_memory: Option<usize>,
chunk_compression_type: CompressionType,
chunk_compression_level: Option<u32>,
) -> anyhow::Result<(Reader<Mmap>, Reader<Mmap>)> ) -> anyhow::Result<(Reader<Mmap>, Reader<Mmap>)>
{ {
debug!("{:?}: Indexing into a Store...", thread_index); debug!("{:?}: Indexing into a Store...", thread_index);
let mut store = Store::new(arc_cache_size, max_nb_chunks, max_memory); let mut store = Store::new(
arc_cache_size,
max_nb_chunks,
max_memory,
chunk_compression_type,
chunk_compression_level,
);
// Write the headers into a Vec of bytes and then into the store. // Write the headers into a Vec of bytes and then into the store.
let headers = rdr.headers()?; let headers = rdr.headers()?;
@ -427,9 +485,13 @@ fn main() -> anyhow::Result<()> {
let documents_path = opt.database.join("documents.mtbl"); let documents_path = opt.database.join("documents.mtbl");
let num_threads = rayon::current_num_threads(); let num_threads = rayon::current_num_threads();
let arc_cache_size = opt.arc_cache_size; let arc_cache_size = opt.indexer.arc_cache_size;
let max_nb_chunks = opt.max_nb_chunks; let max_nb_chunks = opt.indexer.max_nb_chunks;
let max_memory = opt.max_memory; let max_memory = opt.indexer.max_memory;
let chunk_compression_type = compression_type_from_str(&opt.indexer.chunk_compression_type);
let chunk_compression_level = opt.indexer.chunk_compression_level;
let documents_compression_type = compression_type_from_str(&opt.documents_compression_type);
let documents_compression_level = opt.documents_compression_level;
let csv_readers = match opt.csv_file { let csv_readers = match opt.csv_file {
Some(file_path) => { Some(file_path) => {
@ -472,7 +534,18 @@ fn main() -> anyhow::Result<()> {
let readers = csv_readers let readers = csv_readers
.into_par_iter() .into_par_iter()
.enumerate() .enumerate()
.map(|(i, rdr)| index_csv(rdr, i, num_threads, arc_cache_size, max_nb_chunks, max_memory)) .map(|(i, rdr)| {
index_csv(
rdr,
i,
num_threads,
arc_cache_size,
max_nb_chunks,
max_memory,
chunk_compression_type,
chunk_compression_level,
)
})
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
let mut stores = Vec::with_capacity(readers.len()); let mut stores = Vec::with_capacity(readers.len());
@ -494,7 +567,10 @@ fn main() -> anyhow::Result<()> {
}, || { }, || {
// We also merge the documents into its own MTBL store. // We also merge the documents into its own MTBL store.
let file = OpenOptions::new().create(true).truncate(true).write(true).read(true).open(documents_path)?; let file = OpenOptions::new().create(true).truncate(true).write(true).read(true).open(documents_path)?;
let mut writer = Writer::builder().compression_type(CompressionType::Zlib).compression_level(9).build(file); let mut writer = Writer::builder()
.compression_type(documents_compression_type)
.compression_level(documents_compression_level)
.build(file);
let mut builder = Merger::builder(docs_merge); let mut builder = Merger::builder(docs_merge);
builder.extend(docs_stores); builder.extend(docs_stores);
builder.build().write_into(&mut writer)?; builder.build().write_into(&mut writer)?;