From 3889d956d931b8b358205651527a038502b42c79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 26 Oct 2020 20:18:10 +0100 Subject: [PATCH] Introduce the UpdateBuilder and use it in the HTTP routes --- src/indexing/mod.rs | 410 ---------------- src/lib.rs | 1 - src/subcommand/serve.rs | 146 ++++-- .../index_documents}/merge_function.rs | 16 + src/update/index_documents/mod.rs | 446 +++++++++++++++++- .../index_documents}/store.rs | 15 +- .../index_documents}/transform.rs | 66 ++- src/update/update_builder.rs | 78 +-- 8 files changed, 641 insertions(+), 537 deletions(-) delete mode 100644 src/indexing/mod.rs rename src/{indexing => update/index_documents}/merge_function.rs (83%) rename src/{indexing => update/index_documents}/store.rs (97%) rename src/{indexing => update/index_documents}/transform.rs (79%) diff --git a/src/indexing/mod.rs b/src/indexing/mod.rs deleted file mode 100644 index 6cbdccf33..000000000 --- a/src/indexing/mod.rs +++ /dev/null @@ -1,410 +0,0 @@ -use std::borrow::Cow; -use std::fs::File; -use std::io::{self, Seek, SeekFrom}; -use std::sync::mpsc::sync_channel; -use std::time::Instant; - -use anyhow::Context; -use bstr::ByteSlice as _; -use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; -use heed::types::ByteSlice; -use log::{debug, info, error}; -use rayon::prelude::*; -use roaring::RoaringBitmap; -use structopt::StructOpt; -use tempfile::tempfile; - -use crate::FieldsIdsMap; -use crate::index::Index; -use self::store::Store; -use self::merge_function::{ - main_merge, word_docids_merge, words_pairs_proximities_docids_merge, - docid_word_positions_merge, documents_merge, -}; - -pub use self::transform::{Transform, TransformOutput}; - -mod merge_function; -mod store; -mod transform; - -#[derive(Debug, Clone, StructOpt)] -pub struct IndexerOpt { - /// The amount of documents to skip before printing - /// a log regarding the indexing advancement. - #[structopt(long, default_value = "1000000")] // 1m - pub log_every_n: usize, - - /// MTBL max number of chunks in bytes. - #[structopt(long)] - pub max_nb_chunks: Option, - - /// The maximum amount of memory to use for the MTBL buffer. It is recommended - /// to use something like 80%-90% of the available memory. - /// - /// It is automatically split by the number of jobs e.g. if you use 7 jobs - /// and 7 GB of max memory, each thread will use a maximum of 1 GB. - #[structopt(long, default_value = "7516192768")] // 7 GB - pub max_memory: usize, - - /// Size of the linked hash map cache when indexing. - /// The bigger it is, the faster the indexing is but the more memory it takes. - #[structopt(long, default_value = "500")] - pub linked_hash_map_size: usize, - - /// The name of the compression algorithm to use when compressing intermediate - /// chunks during indexing documents. - /// - /// Choosing a fast algorithm will make the indexing faster but may consume more memory. - #[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] - pub chunk_compression_type: CompressionType, - - /// The level of compression of the chosen algorithm. - #[structopt(long, requires = "chunk-compression-type")] - pub chunk_compression_level: Option, - - /// The number of bytes to remove from the begining of the chunks while reading/sorting - /// or merging them. - /// - /// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`, - /// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set. - #[structopt(long, default_value = "4294967296")] // 4 GB - pub chunk_fusing_shrink_size: u64, - - /// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2. - #[structopt(long)] - pub enable_chunk_fusing: bool, - - /// Number of parallel jobs for indexing, defaults to # of CPUs. - #[structopt(long)] - pub indexing_jobs: Option, -} - -#[derive(Debug, Copy, Clone)] -enum WriteMethod { - Append, - GetMergePut, -} - -type MergeFn = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> anyhow::Result>; - -fn create_writer(typ: CompressionType, level: Option, file: File) -> io::Result> { - let mut builder = Writer::builder(); - builder.compression_type(typ); - if let Some(level) = level { - builder.compression_level(level); - } - builder.build(file) -} - -fn create_sorter( - merge: MergeFn, - chunk_compression_type: CompressionType, - chunk_compression_level: Option, - chunk_fusing_shrink_size: Option, - max_nb_chunks: Option, - max_memory: Option, -) -> Sorter -{ - let mut builder = Sorter::builder(merge); - if let Some(shrink_size) = chunk_fusing_shrink_size { - builder.file_fusing_shrink_size(shrink_size); - } - builder.chunk_compression_type(chunk_compression_type); - if let Some(level) = chunk_compression_level { - builder.chunk_compression_level(level); - } - if let Some(nb_chunks) = max_nb_chunks { - builder.max_nb_chunks(nb_chunks); - } - if let Some(memory) = max_memory { - builder.max_memory(memory); - } - builder.build() -} - -fn writer_into_reader(writer: Writer, shrink_size: Option) -> anyhow::Result> { - let mut file = writer.into_inner()?; - file.seek(SeekFrom::Start(0))?; - let file = if let Some(shrink_size) = shrink_size { - FileFuse::builder().shrink_size(shrink_size).build(file) - } else { - FileFuse::new(file) - }; - Reader::new(file).map_err(Into::into) -} - -fn merge_readers(sources: Vec>, merge: MergeFn) -> Merger { - let mut builder = Merger::builder(merge); - builder.extend(sources); - builder.build() -} - -fn merge_into_lmdb_database( - wtxn: &mut heed::RwTxn, - database: heed::PolyDatabase, - sources: Vec>, - merge: MergeFn, - method: WriteMethod, -) -> anyhow::Result<()> { - debug!("Merging {} MTBL stores...", sources.len()); - let before = Instant::now(); - - let merger = merge_readers(sources, merge); - let mut in_iter = merger.into_merge_iter()?; - - match method { - WriteMethod::Append => { - let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; - while let Some((k, v)) = in_iter.next()? { - out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; - } - }, - WriteMethod::GetMergePut => { - while let Some((k, v)) = in_iter.next()? { - match database.get::<_, ByteSlice, ByteSlice>(wtxn, k)? { - Some(old_val) => { - let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; - let val = merge(k, &vals).expect("merge failed"); - database.put::<_, ByteSlice, ByteSlice>(wtxn, k, &val)? - }, - None => database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?, - } - } - }, - } - - debug!("MTBL stores merged in {:.02?}!", before.elapsed()); - Ok(()) -} - -fn write_into_lmdb_database( - wtxn: &mut heed::RwTxn, - database: heed::PolyDatabase, - mut reader: Reader, - merge: MergeFn, - method: WriteMethod, -) -> anyhow::Result<()> { - debug!("Writing MTBL stores..."); - let before = Instant::now(); - - match method { - WriteMethod::Append => { - let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; - while let Some((k, v)) = reader.next()? { - out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; - } - }, - WriteMethod::GetMergePut => { - while let Some((k, v)) = reader.next()? { - match database.get::<_, ByteSlice, ByteSlice>(wtxn, k)? { - Some(old_val) => { - let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; - let val = merge(k, &vals).expect("merge failed"); - database.put::<_, ByteSlice, ByteSlice>(wtxn, k, &val)? - }, - None => database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?, - } - } - } - } - - debug!("MTBL stores merged in {:.02?}!", before.elapsed()); - Ok(()) -} - -pub fn run( - env: &heed::Env, - index: &Index, - opt: &IndexerOpt, - fields_ids_map: FieldsIdsMap, - users_ids_documents_ids: fst::Map>, - new_documents_ids: RoaringBitmap, - documents: grenad::Reader<&[u8]>, - documents_count: u32, - progress_callback: F, -) -> anyhow::Result<()> -where F: Fn(u32, u32) + Sync + Send, -{ - let jobs = opt.indexing_jobs.unwrap_or(0); - let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?; - pool.install(|| { - run_intern( - env, - index, - opt, - fields_ids_map, - users_ids_documents_ids, - new_documents_ids, - documents, - documents_count, - progress_callback, - ) - }) -} - -fn run_intern( - env: &heed::Env, - index: &Index, - opt: &IndexerOpt, - fields_ids_map: FieldsIdsMap, - users_ids_documents_ids: fst::Map>, - new_documents_ids: RoaringBitmap, - documents: grenad::Reader<&[u8]>, - documents_count: u32, - progress_callback: F, -) -> anyhow::Result<()> -where F: Fn(u32, u32) + Sync + Send, -{ - let before_indexing = Instant::now(); - let num_threads = rayon::current_num_threads(); - let linked_hash_map_size = opt.linked_hash_map_size; - let max_nb_chunks = opt.max_nb_chunks; - let max_memory_by_job = opt.max_memory / num_threads; - let chunk_compression_type = opt.chunk_compression_type; - let chunk_compression_level = opt.chunk_compression_level; - let log_every_n = opt.log_every_n; - - let chunk_fusing_shrink_size = if opt.enable_chunk_fusing { - Some(opt.chunk_fusing_shrink_size) - } else { - None - }; - - let readers = rayon::iter::repeatn(documents, num_threads) - .enumerate() - .map(|(i, documents)| { - let store = Store::new( - linked_hash_map_size, - max_nb_chunks, - Some(max_memory_by_job), - chunk_compression_type, - chunk_compression_level, - chunk_fusing_shrink_size, - )?; - store.index(documents, documents_count, i, num_threads, log_every_n, &progress_callback) - }) - .collect::, _>>()?; - - let mut main_readers = Vec::with_capacity(readers.len()); - let mut word_docids_readers = Vec::with_capacity(readers.len()); - let mut docid_word_positions_readers = Vec::with_capacity(readers.len()); - let mut words_pairs_proximities_docids_readers = Vec::with_capacity(readers.len()); - let mut documents_readers = Vec::with_capacity(readers.len()); - readers.into_iter().for_each(|readers| { - main_readers.push(readers.main); - word_docids_readers.push(readers.word_docids); - docid_word_positions_readers.push(readers.docid_word_positions); - words_pairs_proximities_docids_readers.push(readers.words_pairs_proximities_docids); - documents_readers.push(readers.documents); - }); - - // This is the function that merge the readers - // by using the given merge function. - let merge_readers = move |readers, merge| { - let mut writer = tempfile().and_then(|f| { - create_writer(chunk_compression_type, chunk_compression_level, f) - })?; - let merger = merge_readers(readers, merge); - merger.write_into(&mut writer)?; - writer_into_reader(writer, chunk_fusing_shrink_size) - }; - - // The enum and the channel which is used to transfert - // the readers merges potentially done on another thread. - enum DatabaseType { Main, WordDocids, WordsPairsProximitiesDocids }; - let (sender, receiver) = sync_channel(3); - - debug!("Merging the main, word docids and words pairs proximity docids in parallel..."); - rayon::spawn(move || { - vec![ - (DatabaseType::Main, main_readers, main_merge as MergeFn), - (DatabaseType::WordDocids, word_docids_readers, word_docids_merge), - ( - DatabaseType::WordsPairsProximitiesDocids, - words_pairs_proximities_docids_readers, - words_pairs_proximities_docids_merge, - ), - ] - .into_par_iter() - .for_each(|(dbtype, readers, merge)| { - let result = merge_readers(readers, merge); - if let Err(e) = sender.send((dbtype, result)) { - error!("sender error: {}", e); - } - }); - }); - - // We create the write transaction of this update. - // TODO we must get this transaction as an argument to be able - // to first delete the replaced documents for example. - let mut wtxn = env.write_txn()?; - - let mut documents_ids = index.documents_ids(&wtxn)?; - let contains_documents = !documents_ids.is_empty(); - let write_method = if contains_documents { - WriteMethod::GetMergePut - } else { - WriteMethod::Append - }; - - // We write the fields ids map into the main database - index.put_fields_ids_map(&mut wtxn, &fields_ids_map)?; - - // We write the users_ids_documents_ids into the main database. - index.put_users_ids_documents_ids(&mut wtxn, &users_ids_documents_ids)?; - - // We merge the new documents ids with the existing ones. - documents_ids.union_with(&new_documents_ids); - index.put_documents_ids(&mut wtxn, &documents_ids)?; - - debug!("Writing the docid word positions into LMDB on disk..."); - merge_into_lmdb_database( - &mut wtxn, - *index.docid_word_positions.as_polymorph(), - docid_word_positions_readers, - docid_word_positions_merge, - write_method - )?; - - debug!("Writing the documents into LMDB on disk..."); - merge_into_lmdb_database( - &mut wtxn, - *index.documents.as_polymorph(), - documents_readers, - documents_merge, - write_method - )?; - - for (db_type, result) in receiver { - let content = result?; - match db_type { - DatabaseType::Main => { - debug!("Writing the main elements into LMDB on disk..."); - write_into_lmdb_database(&mut wtxn, index.main, content, main_merge, write_method)?; - }, - DatabaseType::WordDocids => { - debug!("Writing the words docids into LMDB on disk..."); - let db = *index.word_docids.as_polymorph(); - write_into_lmdb_database(&mut wtxn, db, content, word_docids_merge, write_method)?; - }, - DatabaseType::WordsPairsProximitiesDocids => { - debug!("Writing the words pairs proximities docids into LMDB on disk..."); - let db = *index.word_pair_proximity_docids.as_polymorph(); - write_into_lmdb_database( - &mut wtxn, - db, - content, - words_pairs_proximities_docids_merge, - write_method, - )?; - }, - } - } - - wtxn.commit()?; - - info!("Update processed in {:.02?}", before_indexing.elapsed()); - - Ok(()) -} diff --git a/src/lib.rs b/src/lib.rs index da04b682d..9ac492143 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,6 @@ mod criterion; mod fields_ids_map; mod index; -mod indexing; mod mdfs; mod query_tokens; mod search; diff --git a/src/subcommand/serve.rs b/src/subcommand/serve.rs index 8a7ba3784..379bb68db 100644 --- a/src/subcommand/serve.rs +++ b/src/subcommand/serve.rs @@ -11,6 +11,7 @@ use askama_warp::Template; use flate2::read::GzDecoder; use futures::stream; use futures::{FutureExt, StreamExt}; +use grenad::CompressionType; use heed::EnvOpenOptions; use indexmap::IndexMap; use serde::{Serialize, Deserialize}; @@ -21,9 +22,8 @@ use tokio::sync::broadcast; use warp::filters::ws::Message; use warp::{Filter, http::Response}; -use crate::indexing::{self, IndexerOpt, Transform, TransformOutput}; use crate::tokenizer::{simple_tokenizer, TokenType}; -use crate::update::AvailableDocumentsIds; +use crate::update::{UpdateBuilder, IndexDocumentsMethod}; use crate::{Index, UpdateStore, SearchResult}; #[derive(Debug, StructOpt)] @@ -60,6 +60,58 @@ pub struct Opt { indexer: IndexerOpt, } +#[derive(Debug, Clone, StructOpt)] +pub struct IndexerOpt { + /// The amount of documents to skip before printing + /// a log regarding the indexing advancement. + #[structopt(long, default_value = "1000000")] // 1m + pub log_every_n: usize, + + /// MTBL max number of chunks in bytes. + #[structopt(long)] + pub max_nb_chunks: Option, + + /// The maximum amount of memory to use for the MTBL buffer. It is recommended + /// to use something like 80%-90% of the available memory. + /// + /// It is automatically split by the number of jobs e.g. if you use 7 jobs + /// and 7 GB of max memory, each thread will use a maximum of 1 GB. + #[structopt(long, default_value = "7516192768")] // 7 GB + pub max_memory: usize, + + /// Size of the linked hash map cache when indexing. + /// The bigger it is, the faster the indexing is but the more memory it takes. + #[structopt(long, default_value = "500")] + pub linked_hash_map_size: usize, + + /// The name of the compression algorithm to use when compressing intermediate + /// chunks during indexing documents. + /// + /// Choosing a fast algorithm will make the indexing faster but may consume more memory. + #[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] + pub chunk_compression_type: CompressionType, + + /// The level of compression of the chosen algorithm. + #[structopt(long, requires = "chunk-compression-type")] + pub chunk_compression_level: Option, + + /// The number of bytes to remove from the begining of the chunks while reading/sorting + /// or merging them. + /// + /// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`, + /// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set. + #[structopt(long, default_value = "4294967296")] // 4 GB + pub chunk_fusing_shrink_size: u64, + + /// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2. + #[structopt(long)] + pub enable_chunk_fusing: bool, + + /// Number of parallel jobs for indexing, defaults to # of CPUs. + #[structopt(long)] + pub indexing_jobs: Option, +} + fn highlight_record(record: &mut IndexMap, words: &HashSet) { for (_key, value) in record.iter_mut() { let old_value = mem::take(value); @@ -152,25 +204,36 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { update_store_options, update_store_path, move |update_id, meta, content| { - let result = match meta { + // We prepare the update by using the update builder. + let mut update_builder = UpdateBuilder::new(); + if let Some(max_nb_chunks) = indexer_opt_cloned.max_nb_chunks { + update_builder.max_nb_chunks(max_nb_chunks); + } + if let Some(chunk_compression_level) = indexer_opt_cloned.chunk_compression_level { + update_builder.chunk_compression_level(chunk_compression_level); + } + if let Some(indexing_jobs) = indexer_opt_cloned.indexing_jobs { + update_builder.indexing_jobs(indexing_jobs); + } + update_builder.log_every_n(indexer_opt_cloned.log_every_n); + update_builder.max_memory(indexer_opt_cloned.max_memory); + update_builder.linked_hash_map_size(indexer_opt_cloned.linked_hash_map_size); + update_builder.chunk_compression_type(indexer_opt_cloned.chunk_compression_type); + update_builder.chunk_fusing_shrink_size(indexer_opt_cloned.chunk_fusing_shrink_size); + + // we extract the update type and execute the update itself. + let result: anyhow::Result<()> = match meta { UpdateMeta::DocumentsAddition => { // We must use the write transaction of the update here. - let rtxn = env_cloned.read_txn()?; - let fields_ids_map = index_cloned.fields_ids_map(&rtxn)?; - let documents_ids = index_cloned.documents_ids(&rtxn)?; - let available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); - let users_ids_documents_ids = index_cloned.users_ids_documents_ids(&rtxn).unwrap(); + let mut wtxn = env_cloned.write_txn()?; + let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned); - let transform = Transform { - fields_ids_map, - available_documents_ids, - users_ids_documents_ids, - chunk_compression_type: indexer_opt_cloned.chunk_compression_type, - chunk_compression_level: indexer_opt_cloned.chunk_compression_level, - chunk_fusing_shrink_size: Some(indexer_opt_cloned.chunk_fusing_shrink_size), - max_nb_chunks: indexer_opt_cloned.max_nb_chunks, - max_memory: Some(indexer_opt_cloned.max_memory), - }; + let replace_documents = true; + if replace_documents { + builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments); + } else { + builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); + } let gzipped = false; let reader = if gzipped { @@ -179,41 +242,22 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { Box::new(content) as Box }; - let TransformOutput { - fields_ids_map, - users_ids_documents_ids, - new_documents_ids, - replaced_documents_ids, - documents_count, - documents_file, - } = transform.from_csv(reader).unwrap(); + let result = builder.execute(reader, |count, total| { + let _ = update_status_sender_cloned.send(UpdateStatus::Progressing { + update_id, + meta: UpdateMetaProgress::DocumentsAddition { + processed_number_of_documents: count, + total_number_of_documents: Some(total), + } + }); + }); - drop(rtxn); - - let mmap = unsafe { memmap::Mmap::map(&documents_file)? }; - let documents = grenad::Reader::new(mmap.as_ref()).unwrap(); - - indexing::run( - &env_cloned, - &index_cloned, - &indexer_opt_cloned, - fields_ids_map, - users_ids_documents_ids, - new_documents_ids, - documents, - documents_count as u32, - |count, total| { - // We send progress status... - let meta = UpdateMetaProgress::DocumentsAddition { - processed_number_of_documents: count as usize, - total_number_of_documents: Some(total as usize), - }; - let progress = UpdateStatus::Progressing { update_id, meta }; - let _ = update_status_sender_cloned.send(progress); - }, - ) + match result { + Ok(()) => wtxn.commit().map_err(Into::into), + Err(e) => Err(e.into()) + } }, - UpdateMeta::DocumentsAdditionFromPath { path } => { + UpdateMeta::DocumentsAdditionFromPath { path: _ } => { todo!() } }; diff --git a/src/indexing/merge_function.rs b/src/update/index_documents/merge_function.rs similarity index 83% rename from src/indexing/merge_function.rs rename to src/update/index_documents/merge_function.rs index 3e957394f..941b17536 100644 --- a/src/indexing/merge_function.rs +++ b/src/update/index_documents/merge_function.rs @@ -69,3 +69,19 @@ pub fn words_pairs_proximities_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) - pub fn documents_merge(key: &[u8], _values: &[Cow<[u8]>]) -> anyhow::Result> { bail!("merging documents is an error ({:?})", key.as_bstr()) } + +pub fn merge_two_obkv(base: obkv::KvReader, update: obkv::KvReader, buffer: &mut Vec) { + use itertools::merge_join_by; + use itertools::EitherOrBoth::{Both, Left, Right}; + + buffer.clear(); + + let mut writer = obkv::KvWriter::new(buffer); + for eob in merge_join_by(base.iter(), update.iter(), |(b, _), (u, _)| b.cmp(u)) { + match eob { + Both(_, (k, v)) | Left((k, v)) | Right((k, v)) => writer.insert(k, v).unwrap(), + } + } + + writer.finish().unwrap(); +} diff --git a/src/update/index_documents/mod.rs b/src/update/index_documents/mod.rs index 0cdd68df1..d8ffbca43 100644 --- a/src/update/index_documents/mod.rs +++ b/src/update/index_documents/mod.rs @@ -1,4 +1,161 @@ -use crate::Index; +use std::borrow::Cow; +use std::fs::File; +use std::io::{self, Seek, SeekFrom}; +use std::sync::mpsc::sync_channel; +use std::time::Instant; + +use anyhow::Context; +use bstr::ByteSlice as _; +use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; +use heed::types::ByteSlice; +use log::{debug, info, error}; +use rayon::prelude::*; +use crate::index::Index; +use self::store::Store; +use self::merge_function::{ + main_merge, word_docids_merge, words_pairs_proximities_docids_merge, + docid_word_positions_merge, documents_merge, +}; +pub use self::transform::{Transform, TransformOutput}; + +use super::UpdateBuilder; + +mod merge_function; +mod store; +mod transform; + +#[derive(Debug, Copy, Clone)] +enum WriteMethod { + Append, + GetMergePut, +} + +type MergeFn = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> anyhow::Result>; + +fn create_writer(typ: CompressionType, level: Option, file: File) -> io::Result> { + let mut builder = Writer::builder(); + builder.compression_type(typ); + if let Some(level) = level { + builder.compression_level(level); + } + builder.build(file) +} + +fn create_sorter( + merge: MergeFn, + chunk_compression_type: CompressionType, + chunk_compression_level: Option, + chunk_fusing_shrink_size: Option, + max_nb_chunks: Option, + max_memory: Option, +) -> Sorter +{ + let mut builder = Sorter::builder(merge); + if let Some(shrink_size) = chunk_fusing_shrink_size { + builder.file_fusing_shrink_size(shrink_size); + } + builder.chunk_compression_type(chunk_compression_type); + if let Some(level) = chunk_compression_level { + builder.chunk_compression_level(level); + } + if let Some(nb_chunks) = max_nb_chunks { + builder.max_nb_chunks(nb_chunks); + } + if let Some(memory) = max_memory { + builder.max_memory(memory); + } + builder.build() +} + +fn writer_into_reader(writer: Writer, shrink_size: Option) -> anyhow::Result> { + let mut file = writer.into_inner()?; + file.seek(SeekFrom::Start(0))?; + let file = if let Some(shrink_size) = shrink_size { + FileFuse::builder().shrink_size(shrink_size).build(file) + } else { + FileFuse::new(file) + }; + Reader::new(file).map_err(Into::into) +} + +fn merge_readers(sources: Vec>, merge: MergeFn) -> Merger { + let mut builder = Merger::builder(merge); + builder.extend(sources); + builder.build() +} + +fn merge_into_lmdb_database( + wtxn: &mut heed::RwTxn, + database: heed::PolyDatabase, + sources: Vec>, + merge: MergeFn, + method: WriteMethod, +) -> anyhow::Result<()> { + debug!("Merging {} MTBL stores...", sources.len()); + let before = Instant::now(); + + let merger = merge_readers(sources, merge); + let mut in_iter = merger.into_merge_iter()?; + + match method { + WriteMethod::Append => { + let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; + while let Some((k, v)) = in_iter.next()? { + out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; + } + }, + WriteMethod::GetMergePut => { + while let Some((k, v)) = in_iter.next()? { + match database.get::<_, ByteSlice, ByteSlice>(wtxn, k)? { + Some(old_val) => { + let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; + let val = merge(k, &vals).expect("merge failed"); + database.put::<_, ByteSlice, ByteSlice>(wtxn, k, &val)? + }, + None => database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?, + } + } + }, + } + + debug!("MTBL stores merged in {:.02?}!", before.elapsed()); + Ok(()) +} + +fn write_into_lmdb_database( + wtxn: &mut heed::RwTxn, + database: heed::PolyDatabase, + mut reader: Reader, + merge: MergeFn, + method: WriteMethod, +) -> anyhow::Result<()> { + debug!("Writing MTBL stores..."); + let before = Instant::now(); + + match method { + WriteMethod::Append => { + let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; + while let Some((k, v)) = reader.next()? { + out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; + } + }, + WriteMethod::GetMergePut => { + while let Some((k, v)) = reader.next()? { + match database.get::<_, ByteSlice, ByteSlice>(wtxn, k)? { + Some(old_val) => { + let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; + let val = merge(k, &vals).expect("merge failed"); + database.put::<_, ByteSlice, ByteSlice>(wtxn, k, &val)? + }, + None => database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?, + } + } + } + } + + debug!("MTBL stores merged in {:.02?}!", before.elapsed()); + Ok(()) +} #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum IndexDocumentsMethod { @@ -14,12 +171,72 @@ pub enum IndexDocumentsMethod { pub struct IndexDocuments<'t, 'u, 'i> { wtxn: &'t mut heed::RwTxn<'u>, index: &'i Index, + log_every_n: Option, + max_nb_chunks: Option, + max_memory: Option, + linked_hash_map_size: Option, + chunk_compression_type: CompressionType, + chunk_compression_level: Option, + chunk_fusing_shrink_size: Option, + indexing_jobs: Option, update_method: IndexDocumentsMethod, } impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { pub fn new(wtxn: &'t mut heed::RwTxn<'u>, index: &'i Index) -> IndexDocuments<'t, 'u, 'i> { - IndexDocuments { wtxn, index, update_method: IndexDocumentsMethod::ReplaceDocuments } + IndexDocuments { + wtxn, + index, + log_every_n: None, + max_nb_chunks: None, + max_memory: None, + linked_hash_map_size: None, + chunk_compression_type: CompressionType::None, + chunk_compression_level: None, + chunk_fusing_shrink_size: None, + indexing_jobs: None, + update_method: IndexDocumentsMethod::ReplaceDocuments + } + } + + pub(crate) fn log_every_n(&mut self, log_every_n: usize) -> &mut Self { + self.log_every_n = Some(log_every_n); + self + } + + pub(crate) fn max_nb_chunks(&mut self, max_nb_chunks: usize) -> &mut Self { + self.max_nb_chunks = Some(max_nb_chunks); + self + } + + pub(crate) fn max_memory(&mut self, max_memory: usize) -> &mut Self { + self.max_memory = Some(max_memory); + self + } + + pub(crate) fn linked_hash_map_size(&mut self, linked_hash_map_size: usize) -> &mut Self { + self.linked_hash_map_size = Some(linked_hash_map_size); + self + } + + pub(crate) fn chunk_compression_type(&mut self, chunk_compression_type: CompressionType) -> &mut Self { + self.chunk_compression_type = chunk_compression_type; + self + } + + pub(crate) fn chunk_compression_level(&mut self, chunk_compression_level: u32) -> &mut Self { + self.chunk_compression_level = Some(chunk_compression_level); + self + } + + pub(crate) fn chunk_fusing_shrink_size(&mut self, chunk_fusing_shrink_size: u64) -> &mut Self { + self.chunk_fusing_shrink_size = Some(chunk_fusing_shrink_size); + self + } + + pub(crate) fn indexing_jobs(&mut self, indexing_jobs: usize) -> &mut Self { + self.indexing_jobs = Some(indexing_jobs); + self } pub fn index_documents_method(&mut self, method: IndexDocumentsMethod) -> &mut Self { @@ -27,7 +244,228 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { self } - pub fn execute(self) -> anyhow::Result<()> { - todo!() + pub fn execute(self, reader: R, progress_callback: F) -> anyhow::Result<()> + where + R: io::Read, + F: Fn(usize, usize) + Sync, + { + let before_indexing = Instant::now(); + + let transform = Transform { + rtxn: &self.wtxn, + index: self.index, + chunk_compression_type: self.chunk_compression_type, + chunk_compression_level: self.chunk_compression_level, + chunk_fusing_shrink_size: self.chunk_fusing_shrink_size, + max_nb_chunks: self.max_nb_chunks, + max_memory: self.max_memory, + index_documents_method: self.update_method, + }; + + let TransformOutput { + fields_ids_map, + users_ids_documents_ids, + new_documents_ids, + replaced_documents_ids, + documents_count, + documents_file, + } = transform.from_csv(reader)?; + + // We delete the documents that this document addition replaces. This way we are + // able to simply insert all the documents even if they already exist in the database. + if !replaced_documents_ids.is_empty() { + let update_builder = UpdateBuilder { + log_every_n: self.log_every_n, + max_nb_chunks: self.max_nb_chunks, + max_memory: self.max_memory, + linked_hash_map_size: self.linked_hash_map_size, + chunk_compression_type: self.chunk_compression_type, + chunk_compression_level: self.chunk_compression_level, + chunk_fusing_shrink_size: self.chunk_fusing_shrink_size, + indexing_jobs: self.indexing_jobs, + }; + let mut deletion_builder = update_builder.delete_documents(self.wtxn, self.index)?; + deletion_builder.delete_documents(&replaced_documents_ids); + let _deleted_documents_count = deletion_builder.execute()?; + } + + let mmap = unsafe { + memmap::Mmap::map(&documents_file).context("mmaping the transform documents file")? + }; + let documents = grenad::Reader::new(mmap.as_ref())?; + + // The enum which indicates the type of the readers + // merges that are potentially done on different threads. + enum DatabaseType { + Main, + WordDocids, + WordsPairsProximitiesDocids, + } + + let linked_hash_map_size = self.linked_hash_map_size; + let max_nb_chunks = self.max_nb_chunks; + let max_memory = self.max_memory; + let chunk_compression_type = self.chunk_compression_type; + let chunk_compression_level = self.chunk_compression_level; + let log_every_n = self.log_every_n; + let chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; + + let jobs = self.indexing_jobs.unwrap_or(0); + let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?; + + let (receiver, docid_word_positions_readers, documents_readers) = pool.install(|| { + let num_threads = rayon::current_num_threads(); + let max_memory_by_job = max_memory.map(|mm| mm / num_threads); + + let readers = rayon::iter::repeatn(documents, num_threads) + .enumerate() + .map(|(i, documents)| { + let store = Store::new( + linked_hash_map_size, + max_nb_chunks, + max_memory_by_job, + chunk_compression_type, + chunk_compression_level, + chunk_fusing_shrink_size, + )?; + store.index( + documents, + documents_count, + i, + num_threads, + log_every_n, + &progress_callback, + ) + }) + .collect::, _>>()?; + + let mut main_readers = Vec::with_capacity(readers.len()); + let mut word_docids_readers = Vec::with_capacity(readers.len()); + let mut docid_word_positions_readers = Vec::with_capacity(readers.len()); + let mut words_pairs_proximities_docids_readers = Vec::with_capacity(readers.len()); + let mut documents_readers = Vec::with_capacity(readers.len()); + readers.into_iter().for_each(|readers| { + main_readers.push(readers.main); + word_docids_readers.push(readers.word_docids); + docid_word_positions_readers.push(readers.docid_word_positions); + words_pairs_proximities_docids_readers.push(readers.words_pairs_proximities_docids); + documents_readers.push(readers.documents); + }); + + // This is the function that merge the readers + // by using the given merge function. + let merge_readers = move |readers, merge| { + let mut writer = tempfile::tempfile().and_then(|f| { + create_writer(chunk_compression_type, chunk_compression_level, f) + })?; + let merger = merge_readers(readers, merge); + merger.write_into(&mut writer)?; + writer_into_reader(writer, chunk_fusing_shrink_size) + }; + + // The enum and the channel which is used to transfert + // the readers merges potentially done on another thread. + let (sender, receiver) = sync_channel(3); + + debug!("Merging the main, word docids and words pairs proximity docids in parallel..."); + rayon::spawn(move || { + vec![ + (DatabaseType::Main, main_readers, main_merge as MergeFn), + (DatabaseType::WordDocids, word_docids_readers, word_docids_merge), + ( + DatabaseType::WordsPairsProximitiesDocids, + words_pairs_proximities_docids_readers, + words_pairs_proximities_docids_merge, + ), + ] + .into_par_iter() + .for_each(|(dbtype, readers, merge)| { + let result = merge_readers(readers, merge); + if let Err(e) = sender.send((dbtype, result)) { + error!("sender error: {}", e); + } + }); + }); + + Ok((receiver, docid_word_positions_readers, documents_readers)) as anyhow::Result<_> + })?; + + let mut documents_ids = self.index.documents_ids(self.wtxn)?; + let contains_documents = !documents_ids.is_empty(); + let write_method = if contains_documents { + WriteMethod::GetMergePut + } else { + WriteMethod::Append + }; + + // We write the fields ids map into the main database + self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?; + + // We write the users_ids_documents_ids into the main database. + self.index.put_users_ids_documents_ids(self.wtxn, &users_ids_documents_ids)?; + + // We merge the new documents ids with the existing ones. + documents_ids.union_with(&new_documents_ids); + self.index.put_documents_ids(self.wtxn, &documents_ids)?; + + debug!("Writing the docid word positions into LMDB on disk..."); + merge_into_lmdb_database( + self.wtxn, + *self.index.docid_word_positions.as_polymorph(), + docid_word_positions_readers, + docid_word_positions_merge, + write_method + )?; + + debug!("Writing the documents into LMDB on disk..."); + merge_into_lmdb_database( + self.wtxn, + *self.index.documents.as_polymorph(), + documents_readers, + documents_merge, + write_method + )?; + + for (db_type, result) in receiver { + let content = result?; + match db_type { + DatabaseType::Main => { + debug!("Writing the main elements into LMDB on disk..."); + write_into_lmdb_database( + self.wtxn, + self.index.main, + content, + main_merge, + write_method, + )?; + }, + DatabaseType::WordDocids => { + debug!("Writing the words docids into LMDB on disk..."); + let db = *self.index.word_docids.as_polymorph(); + write_into_lmdb_database( + self.wtxn, + db, + content, + word_docids_merge, + write_method, + )?; + }, + DatabaseType::WordsPairsProximitiesDocids => { + debug!("Writing the words pairs proximities docids into LMDB on disk..."); + let db = *self.index.word_pair_proximity_docids.as_polymorph(); + write_into_lmdb_database( + self.wtxn, + db, + content, + words_pairs_proximities_docids_merge, + write_method, + )?; + }, + } + } + + info!("Update processed in {:.02?}", before_indexing.elapsed()); + + Ok(()) } } diff --git a/src/indexing/store.rs b/src/update/index_documents/store.rs similarity index 97% rename from src/indexing/store.rs rename to src/update/index_documents/store.rs index a6d2afa95..fba946820 100644 --- a/src/indexing/store.rs +++ b/src/update/index_documents/store.rs @@ -56,7 +56,7 @@ pub struct Store { impl Store { pub fn new( - linked_hash_map_size: usize, + linked_hash_map_size: Option, max_nb_chunks: Option, max_memory: Option, chunk_compression_type: CompressionType, @@ -66,6 +66,7 @@ impl Store { { // We divide the max memory by the number of sorter the Store have. let max_memory = max_memory.map(|mm| cmp::max(ONE_KILOBYTE, mm / 3)); + let linked_hash_map_size = linked_hash_map_size.unwrap_or(500); let main_sorter = create_sorter( main_merge, @@ -280,13 +281,13 @@ impl Store { pub fn index( mut self, mut documents: grenad::Reader<&[u8]>, - documents_count: u32, + documents_count: usize, thread_index: usize, num_threads: usize, - log_every_n: usize, + log_every_n: Option, mut progress_callback: F, ) -> anyhow::Result - where F: FnMut(u32, u32), + where F: FnMut(usize, usize), { debug!("{:?}: Indexing in a Store...", thread_index); @@ -301,9 +302,9 @@ impl Store { // We skip documents that must not be indexed by this thread. if count % num_threads == thread_index { // This is a log routine that we do every `log_every_n` documents. - if count % log_every_n == 0 { + if log_every_n.map_or(false, |len| count % len == 0) { info!("We have seen {} documents so far ({:.02?}).", format_count(count), before.elapsed()); - progress_callback(count as u32, documents_count); + progress_callback(count, documents_count); before = Instant::now(); } @@ -325,7 +326,7 @@ impl Store { count = count + 1; } - progress_callback(count as u32, documents_count); + progress_callback(count, documents_count); let readers = self.finish()?; debug!("{:?}: Store created!", thread_index); diff --git a/src/indexing/transform.rs b/src/update/index_documents/transform.rs similarity index 79% rename from src/indexing/transform.rs rename to src/update/index_documents/transform.rs index 901b12dbd..eb4c84fc6 100644 --- a/src/indexing/transform.rs +++ b/src/update/index_documents/transform.rs @@ -8,9 +8,10 @@ use fst::{IntoStreamer, Streamer}; use grenad::CompressionType; use roaring::RoaringBitmap; -use crate::FieldsIdsMap; +use crate::{BEU32, Index, FieldsIdsMap}; use crate::update::AvailableDocumentsIds; -use super::{create_writer, create_sorter}; +use super::merge_function::merge_two_obkv; +use super::{create_writer, create_sorter, IndexDocumentsMethod}; pub struct TransformOutput { pub fields_ids_map: FieldsIdsMap, @@ -21,41 +22,30 @@ pub struct TransformOutput { pub documents_file: File, } -pub struct Transform { - pub fields_ids_map: FieldsIdsMap, - pub available_documents_ids: AvailableDocumentsIds, - pub users_ids_documents_ids: fst::Map, +pub struct Transform<'t, 'i> { + pub rtxn: &'t heed::RoTxn, + pub index: &'i Index, pub chunk_compression_type: CompressionType, pub chunk_compression_level: Option, pub chunk_fusing_shrink_size: Option, pub max_nb_chunks: Option, pub max_memory: Option, + pub index_documents_method: IndexDocumentsMethod, } -fn merge_two_obkv(base: obkv::KvReader, update: obkv::KvReader, buffer: &mut Vec) { - use itertools::merge_join_by; - use itertools::EitherOrBoth::{Both, Left, Right}; - - buffer.clear(); - - let mut writer = obkv::KvWriter::new(buffer); - for eob in merge_join_by(base.iter(), update.iter(), |(b, _), (u, _)| b.cmp(u)) { - match eob { - Both(_, (k, v)) | Left((k, v)) | Right((k, v)) => writer.insert(k, v).unwrap(), - } - } - - writer.finish().unwrap(); -} - -impl> Transform { +impl Transform<'_, '_> { /// Extract the users ids, deduplicate and compute the new internal documents ids /// and fields ids, writing all the documents under their internal ids into a final file. /// /// Outputs the new `FieldsIdsMap`, the new `UsersIdsDocumentsIds` map, the new documents ids, /// the replaced documents ids, the number of documents in this update and the file /// containing all those documents. - pub fn from_csv(mut self, reader: R) -> anyhow::Result { + pub fn from_csv(self, reader: R) -> anyhow::Result { + let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; + let documents_ids = self.index.documents_ids(self.rtxn)?; + let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); + let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap(); + let mut csv = csv::Reader::from_reader(reader); let headers = csv.headers()?.clone(); let user_id_pos = headers.iter().position(|h| h == "id").context(r#"missing "id" header"#)?; @@ -63,7 +53,7 @@ impl> Transform { // Generate the new fields ids based on the current fields ids and this CSV headers. let mut fields_ids = Vec::new(); for header in headers.iter() { - let id = self.fields_ids_map.insert(header) + let id = fields_ids_map.insert(header) .context("impossible to generate a field id (limit reached)")?; fields_ids.push(id); } @@ -120,7 +110,7 @@ impl> Transform { let mut iter = sorter.into_iter()?; while let Some((user_id, update_obkv)) = iter.next()? { - let (docid, obkv) = match self.users_ids_documents_ids.get(user_id) { + let (docid, obkv) = match users_ids_documents_ids.get(user_id) { Some(docid) => { // If we find the user id in the current users ids documents ids map // we use it and insert it in the list of replaced documents. @@ -129,20 +119,22 @@ impl> Transform { // Depending on the update indexing method we will merge // the document update with the current document or not. - let must_merge_documents = false; - if must_merge_documents { - let base_obkv = todo!(); - let update_obkv = obkv::KvReader::new(update_obkv); - merge_two_obkv(base_obkv, update_obkv, &mut obkv_buffer); - (docid, obkv_buffer.as_slice()) - } else { - (docid, update_obkv) + match self.index_documents_method { + IndexDocumentsMethod::ReplaceDocuments => (docid, update_obkv), + IndexDocumentsMethod::UpdateDocuments => { + let key = BEU32::new(docid); + let base_obkv = self.index.documents.get(&self.rtxn, &key)? + .context("document not found")?; + let update_obkv = obkv::KvReader::new(update_obkv); + merge_two_obkv(base_obkv, update_obkv, &mut obkv_buffer); + (docid, obkv_buffer.as_slice()) + } } }, None => { // If this user id is new we add it to the users ids documents ids map // for new ids and into the list of new documents. - let new_docid = self.available_documents_ids.next() + let new_docid = available_documents_ids.next() .context("no more available documents ids")?; new_users_ids_documents_ids_builder.insert(user_id, new_docid as u64)?; new_documents_ids.insert(new_docid); @@ -163,7 +155,7 @@ impl> Transform { // We create the union between the existing users ids documents ids with the new ones. let new_users_ids_documents_ids = new_users_ids_documents_ids_builder.into_map(); let union_ = fst::map::OpBuilder::new() - .add(&self.users_ids_documents_ids) + .add(&users_ids_documents_ids) .add(&new_users_ids_documents_ids) .r#union(); @@ -176,7 +168,7 @@ impl> Transform { } Ok(TransformOutput { - fields_ids_map: self.fields_ids_map, + fields_ids_map, users_ids_documents_ids: users_ids_documents_ids_builder.into_map(), new_documents_ids, replaced_documents_ids, diff --git a/src/update/update_builder.rs b/src/update/update_builder.rs index bbbc23619..293930bbf 100644 --- a/src/update/update_builder.rs +++ b/src/update/update_builder.rs @@ -1,35 +1,37 @@ -use std::borrow::Cow; -use std::convert::TryFrom; - -use fst::{IntoStreamer, Streamer}; use grenad::CompressionType; -use itertools::Itertools; -use roaring::RoaringBitmap; -use crate::{Index, BEU32}; +use crate::Index; use super::clear_documents::ClearDocuments; use super::delete_documents::DeleteDocuments; use super::index_documents::IndexDocuments; pub struct UpdateBuilder { - log_every_n: usize, - max_nb_chunks: Option, - max_memory: usize, - linked_hash_map_size: usize, - chunk_compression_type: CompressionType, - chunk_compression_level: Option, - chunk_fusing_shrink_size: u64, - enable_chunk_fusing: bool, - indexing_jobs: Option, + pub(crate) log_every_n: Option, + pub(crate) max_nb_chunks: Option, + pub(crate) max_memory: Option, + pub(crate) linked_hash_map_size: Option, + pub(crate) chunk_compression_type: CompressionType, + pub(crate) chunk_compression_level: Option, + pub(crate) chunk_fusing_shrink_size: Option, + pub(crate) indexing_jobs: Option, } impl UpdateBuilder { pub fn new() -> UpdateBuilder { - todo!() + UpdateBuilder { + log_every_n: None, + max_nb_chunks: None, + max_memory: None, + linked_hash_map_size: None, + chunk_compression_type: CompressionType::None, + chunk_compression_level: None, + chunk_fusing_shrink_size: None, + indexing_jobs: None, + } } pub fn log_every_n(&mut self, log_every_n: usize) -> &mut Self { - self.log_every_n = log_every_n; + self.log_every_n = Some(log_every_n); self } @@ -39,12 +41,12 @@ impl UpdateBuilder { } pub fn max_memory(&mut self, max_memory: usize) -> &mut Self { - self.max_memory = max_memory; + self.max_memory = Some(max_memory); self } pub fn linked_hash_map_size(&mut self, linked_hash_map_size: usize) -> &mut Self { - self.linked_hash_map_size = linked_hash_map_size; + self.linked_hash_map_size = Some(linked_hash_map_size); self } @@ -59,12 +61,7 @@ impl UpdateBuilder { } pub fn chunk_fusing_shrink_size(&mut self, chunk_fusing_shrink_size: u64) -> &mut Self { - self.chunk_fusing_shrink_size = chunk_fusing_shrink_size; - self - } - - pub fn enable_chunk_fusing(&mut self, enable_chunk_fusing: bool) -> &mut Self { - self.enable_chunk_fusing = enable_chunk_fusing; + self.chunk_fusing_shrink_size = Some(chunk_fusing_shrink_size); self } @@ -97,6 +94,33 @@ impl UpdateBuilder { index: &'i Index, ) -> IndexDocuments<'t, 'u, 'i> { - IndexDocuments::new(wtxn, index) + let mut builder = IndexDocuments::new(wtxn, index); + + if let Some(log_every_n) = self.log_every_n { + builder.log_every_n(log_every_n); + } + if let Some(max_nb_chunks) = self.max_nb_chunks { + builder.max_nb_chunks(max_nb_chunks); + } + if let Some(max_memory) = self.max_memory { + builder.max_memory(max_memory); + } + if let Some(linked_hash_map_size) = self.linked_hash_map_size { + builder.linked_hash_map_size(linked_hash_map_size); + } + + builder.chunk_compression_type(self.chunk_compression_type); + + if let Some(chunk_compression_level) = self.chunk_compression_level { + builder.chunk_compression_level(chunk_compression_level); + } + if let Some(chunk_fusing_shrink_size) = self.chunk_fusing_shrink_size { + builder.chunk_fusing_shrink_size(chunk_fusing_shrink_size); + } + if let Some(indexing_jobs) = self.indexing_jobs { + builder.indexing_jobs(indexing_jobs); + } + + builder } }