Introduce the enable-chunk-fusing flag

This commit is contained in:
Clément Renault 2020-10-14 18:39:43 +02:00
parent f980422c57
commit 9021b2dba6
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 37 additions and 18 deletions

2
Cargo.lock generated
View File

@ -577,7 +577,7 @@ checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
[[package]] [[package]]
name = "grenad" name = "grenad"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/Kerollmops/grenad.git?rev=a884670#a8846703ddee1f1ed86efd3168561606c244e135" source = "git+https://github.com/Kerollmops/grenad.git?rev=1094409#1094409c59f41d3896d487f9869c33343f59c233"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"flate2", "flate2",

View File

@ -13,7 +13,7 @@ csv = "1.1.3"
flate2 = "1.0.17" flate2 = "1.0.17"
fst = "0.4.4" fst = "0.4.4"
fxhash = "0.2.1" fxhash = "0.2.1"
grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "dee0815" } grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "1094409" }
heed = { version = "0.8.1", default-features = false, features = ["lmdb"] } heed = { version = "0.8.1", default-features = false, features = ["lmdb"] }
human_format = "1.0.3" human_format = "1.0.3"
jemallocator = "0.3.2" jemallocator = "0.3.2"
@ -52,7 +52,6 @@ fst = "0.4.4"
[features] [features]
default = [] default = []
file-fuse = ["grenad/file-fuse"]
[[bench]] [[bench]]
name = "search" name = "search"

View File

@ -110,8 +110,17 @@ struct IndexerOpt {
#[structopt(long, requires = "chunk-compression-type")] #[structopt(long, requires = "chunk-compression-type")]
chunk_compression_level: Option<u32>, chunk_compression_level: Option<u32>,
/// The number of bytes to remove from the begining of the chunks while reading/sorting
/// or merging them.
///
/// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`,
/// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set.
#[structopt(long, default_value = "4294967296")] // 4 GB #[structopt(long, default_value = "4294967296")] // 4 GB
file_fusing_shrink_size: u64, chunk_fusing_shrink_size: u64,
/// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2.
#[structopt(long)]
enable_chunk_fusing: bool,
} }
fn format_count(n: usize) -> String { fn format_count(n: usize) -> String {
@ -131,10 +140,14 @@ fn create_writer(typ: CompressionType, level: Option<u32>, file: File) -> io::Re
builder.build(file) builder.build(file)
} }
fn writer_into_reader(writer: Writer<File>, shrink_size: u64) -> anyhow::Result<Reader<FileFuse>> { fn writer_into_reader(writer: Writer<File>, shrink_size: Option<u64>) -> anyhow::Result<Reader<FileFuse>> {
let mut file = writer.into_inner()?; let mut file = writer.into_inner()?;
file.seek(SeekFrom::Start(0))?; file.seek(SeekFrom::Start(0))?;
let file = FileFuse::with_shrink_size(file, shrink_size); let file = if let Some(shrink_size) = shrink_size {
FileFuse::builder().shrink_size(shrink_size).build(file)
} else {
FileFuse::new(file)
};
Reader::new(file).map_err(Into::into) Reader::new(file).map_err(Into::into)
} }
@ -142,13 +155,15 @@ fn create_sorter(
merge: MergeFn, merge: MergeFn,
chunk_compression_type: CompressionType, chunk_compression_type: CompressionType,
chunk_compression_level: Option<u32>, chunk_compression_level: Option<u32>,
file_fusing_shrink_size: u64, chunk_fusing_shrink_size: Option<u64>,
max_nb_chunks: Option<usize>, max_nb_chunks: Option<usize>,
max_memory: Option<usize>, max_memory: Option<usize>,
) -> Sorter<MergeFn> ) -> Sorter<MergeFn>
{ {
let mut builder = Sorter::builder(merge); let mut builder = Sorter::builder(merge);
builder.file_fusing_shrink_size(file_fusing_shrink_size); if let Some(shrink_size) = chunk_fusing_shrink_size {
builder.file_fusing_shrink_size(shrink_size);
}
builder.chunk_compression_type(chunk_compression_type); builder.chunk_compression_type(chunk_compression_type);
if let Some(level) = chunk_compression_level { if let Some(level) = chunk_compression_level {
builder.chunk_compression_level(level); builder.chunk_compression_level(level);
@ -215,7 +230,7 @@ struct Store {
// MTBL parameters // MTBL parameters
chunk_compression_type: CompressionType, chunk_compression_type: CompressionType,
chunk_compression_level: Option<u32>, chunk_compression_level: Option<u32>,
file_fusing_shrink_size: u64, chunk_fusing_shrink_size: Option<u64>,
// MTBL sorters // MTBL sorters
main_sorter: Sorter<MergeFn>, main_sorter: Sorter<MergeFn>,
word_docids_sorter: Sorter<MergeFn>, word_docids_sorter: Sorter<MergeFn>,
@ -232,7 +247,7 @@ impl Store {
max_memory: Option<usize>, max_memory: Option<usize>,
chunk_compression_type: CompressionType, chunk_compression_type: CompressionType,
chunk_compression_level: Option<u32>, chunk_compression_level: Option<u32>,
file_fusing_shrink_size: u64, chunk_fusing_shrink_size: Option<u64>,
) -> anyhow::Result<Store> ) -> anyhow::Result<Store>
{ {
// We divide the max memory by the number of sorter the Store have. // We divide the max memory by the number of sorter the Store have.
@ -242,7 +257,7 @@ impl Store {
main_merge, main_merge,
chunk_compression_type, chunk_compression_type,
chunk_compression_level, chunk_compression_level,
file_fusing_shrink_size, chunk_fusing_shrink_size,
max_nb_chunks, max_nb_chunks,
max_memory, max_memory,
); );
@ -250,7 +265,7 @@ impl Store {
word_docids_merge, word_docids_merge,
chunk_compression_type, chunk_compression_type,
chunk_compression_level, chunk_compression_level,
file_fusing_shrink_size, chunk_fusing_shrink_size,
max_nb_chunks, max_nb_chunks,
max_memory, max_memory,
); );
@ -258,7 +273,7 @@ impl Store {
words_pairs_proximities_docids_merge, words_pairs_proximities_docids_merge,
chunk_compression_type, chunk_compression_type,
chunk_compression_level, chunk_compression_level,
file_fusing_shrink_size, chunk_fusing_shrink_size,
max_nb_chunks, max_nb_chunks,
max_memory, max_memory,
); );
@ -278,7 +293,7 @@ impl Store {
documents_ids: RoaringBitmap::new(), documents_ids: RoaringBitmap::new(),
chunk_compression_type, chunk_compression_type,
chunk_compression_level, chunk_compression_level,
file_fusing_shrink_size, chunk_fusing_shrink_size,
main_sorter, main_sorter,
word_docids_sorter, word_docids_sorter,
@ -521,7 +536,7 @@ impl Store {
fn finish(mut self) -> anyhow::Result<Readers> { fn finish(mut self) -> anyhow::Result<Readers> {
let comp_type = self.chunk_compression_type; let comp_type = self.chunk_compression_type;
let comp_level = self.chunk_compression_level; let comp_level = self.chunk_compression_level;
let shrink_size = self.file_fusing_shrink_size; let shrink_size = self.chunk_fusing_shrink_size;
Self::write_word_docids(&mut self.word_docids_sorter, self.word_docids)?; Self::write_word_docids(&mut self.word_docids_sorter, self.word_docids)?;
Self::write_documents_ids(&mut self.main_sorter, self.documents_ids)?; Self::write_documents_ids(&mut self.main_sorter, self.documents_ids)?;
@ -754,9 +769,14 @@ fn main() -> anyhow::Result<()> {
let max_memory_by_job = opt.indexer.max_memory / num_threads; let max_memory_by_job = opt.indexer.max_memory / num_threads;
let chunk_compression_type = opt.indexer.chunk_compression_type; let chunk_compression_type = opt.indexer.chunk_compression_type;
let chunk_compression_level = opt.indexer.chunk_compression_level; let chunk_compression_level = opt.indexer.chunk_compression_level;
let file_fusing_shrink_size = opt.indexer.file_fusing_shrink_size;
let log_every_n = opt.indexer.log_every_n; let log_every_n = opt.indexer.log_every_n;
let chunk_fusing_shrink_size = if opt.indexer.enable_chunk_fusing {
Some(opt.indexer.chunk_fusing_shrink_size)
} else {
None
};
let readers = csv_readers(opt.csv_file, num_threads)? let readers = csv_readers(opt.csv_file, num_threads)?
.into_par_iter() .into_par_iter()
.enumerate() .enumerate()
@ -767,7 +787,7 @@ fn main() -> anyhow::Result<()> {
Some(max_memory_by_job), Some(max_memory_by_job),
chunk_compression_type, chunk_compression_type,
chunk_compression_level, chunk_compression_level,
file_fusing_shrink_size, chunk_fusing_shrink_size,
)?; )?;
store.index_csv(rdr, i, num_threads, log_every_n) store.index_csv(rdr, i, num_threads, log_every_n)
}) })
@ -794,7 +814,7 @@ fn main() -> anyhow::Result<()> {
})?; })?;
let merger = merge_readers(readers, merge); let merger = merge_readers(readers, merge);
merger.write_into(&mut writer)?; merger.write_into(&mut writer)?;
writer_into_reader(writer, file_fusing_shrink_size) writer_into_reader(writer, chunk_fusing_shrink_size)
}; };
// The enum and the channel which is used to transfert // The enum and the channel which is used to transfert