diff --git a/Cargo.lock b/Cargo.lock index ee1197f70..f0499caee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -577,7 +577,7 @@ checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" [[package]] name = "grenad" version = "0.1.0" -source = "git+https://github.com/Kerollmops/grenad.git?rev=a884670#a8846703ddee1f1ed86efd3168561606c244e135" +source = "git+https://github.com/Kerollmops/grenad.git?rev=1094409#1094409c59f41d3896d487f9869c33343f59c233" dependencies = [ "byteorder", "flate2", diff --git a/Cargo.toml b/Cargo.toml index cb8d0dd06..1e66054b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ csv = "1.1.3" flate2 = "1.0.17" fst = "0.4.4" fxhash = "0.2.1" -grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "dee0815" } +grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "1094409" } heed = { version = "0.8.1", default-features = false, features = ["lmdb"] } human_format = "1.0.3" jemallocator = "0.3.2" @@ -52,7 +52,6 @@ fst = "0.4.4" [features] default = [] -file-fuse = ["grenad/file-fuse"] [[bench]] name = "search" diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index 22ea52ab1..52002f89e 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -110,8 +110,17 @@ struct IndexerOpt { #[structopt(long, requires = "chunk-compression-type")] chunk_compression_level: Option, + /// The number of bytes to remove from the begining of the chunks while reading/sorting + /// or merging them. + /// + /// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`, + /// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set. #[structopt(long, default_value = "4294967296")] // 4 GB - file_fusing_shrink_size: u64, + chunk_fusing_shrink_size: u64, + + /// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2. + #[structopt(long)] + enable_chunk_fusing: bool, } fn format_count(n: usize) -> String { @@ -131,10 +140,14 @@ fn create_writer(typ: CompressionType, level: Option, file: File) -> io::Re builder.build(file) } -fn writer_into_reader(writer: Writer, shrink_size: u64) -> anyhow::Result> { +fn writer_into_reader(writer: Writer, shrink_size: Option) -> anyhow::Result> { let mut file = writer.into_inner()?; file.seek(SeekFrom::Start(0))?; - let file = FileFuse::with_shrink_size(file, shrink_size); + let file = if let Some(shrink_size) = shrink_size { + FileFuse::builder().shrink_size(shrink_size).build(file) + } else { + FileFuse::new(file) + }; Reader::new(file).map_err(Into::into) } @@ -142,13 +155,15 @@ fn create_sorter( merge: MergeFn, chunk_compression_type: CompressionType, chunk_compression_level: Option, - file_fusing_shrink_size: u64, + chunk_fusing_shrink_size: Option, max_nb_chunks: Option, max_memory: Option, ) -> Sorter { let mut builder = Sorter::builder(merge); - builder.file_fusing_shrink_size(file_fusing_shrink_size); + if let Some(shrink_size) = chunk_fusing_shrink_size { + builder.file_fusing_shrink_size(shrink_size); + } builder.chunk_compression_type(chunk_compression_type); if let Some(level) = chunk_compression_level { builder.chunk_compression_level(level); @@ -215,7 +230,7 @@ struct Store { // MTBL parameters chunk_compression_type: CompressionType, chunk_compression_level: Option, - file_fusing_shrink_size: u64, + chunk_fusing_shrink_size: Option, // MTBL sorters main_sorter: Sorter, word_docids_sorter: Sorter, @@ -232,7 +247,7 @@ impl Store { max_memory: Option, chunk_compression_type: CompressionType, chunk_compression_level: Option, - file_fusing_shrink_size: u64, + chunk_fusing_shrink_size: Option, ) -> anyhow::Result { // We divide the max memory by the number of sorter the Store have. @@ -242,7 +257,7 @@ impl Store { main_merge, chunk_compression_type, chunk_compression_level, - file_fusing_shrink_size, + chunk_fusing_shrink_size, max_nb_chunks, max_memory, ); @@ -250,7 +265,7 @@ impl Store { word_docids_merge, chunk_compression_type, chunk_compression_level, - file_fusing_shrink_size, + chunk_fusing_shrink_size, max_nb_chunks, max_memory, ); @@ -258,7 +273,7 @@ impl Store { words_pairs_proximities_docids_merge, chunk_compression_type, chunk_compression_level, - file_fusing_shrink_size, + chunk_fusing_shrink_size, max_nb_chunks, max_memory, ); @@ -278,7 +293,7 @@ impl Store { documents_ids: RoaringBitmap::new(), chunk_compression_type, chunk_compression_level, - file_fusing_shrink_size, + chunk_fusing_shrink_size, main_sorter, word_docids_sorter, @@ -521,7 +536,7 @@ impl Store { fn finish(mut self) -> anyhow::Result { let comp_type = self.chunk_compression_type; let comp_level = self.chunk_compression_level; - let shrink_size = self.file_fusing_shrink_size; + let shrink_size = self.chunk_fusing_shrink_size; Self::write_word_docids(&mut self.word_docids_sorter, self.word_docids)?; Self::write_documents_ids(&mut self.main_sorter, self.documents_ids)?; @@ -754,9 +769,14 @@ fn main() -> anyhow::Result<()> { let max_memory_by_job = opt.indexer.max_memory / num_threads; let chunk_compression_type = opt.indexer.chunk_compression_type; let chunk_compression_level = opt.indexer.chunk_compression_level; - let file_fusing_shrink_size = opt.indexer.file_fusing_shrink_size; let log_every_n = opt.indexer.log_every_n; + let chunk_fusing_shrink_size = if opt.indexer.enable_chunk_fusing { + Some(opt.indexer.chunk_fusing_shrink_size) + } else { + None + }; + let readers = csv_readers(opt.csv_file, num_threads)? .into_par_iter() .enumerate() @@ -767,7 +787,7 @@ fn main() -> anyhow::Result<()> { Some(max_memory_by_job), chunk_compression_type, chunk_compression_level, - file_fusing_shrink_size, + chunk_fusing_shrink_size, )?; store.index_csv(rdr, i, num_threads, log_every_n) }) @@ -794,7 +814,7 @@ fn main() -> anyhow::Result<()> { })?; let merger = merge_readers(readers, merge); merger.write_into(&mut writer)?; - writer_into_reader(writer, file_fusing_shrink_size) + writer_into_reader(writer, chunk_fusing_shrink_size) }; // The enum and the channel which is used to transfert