diff --git a/.gitignore b/.gitignore index ea8c4bf7f..43ec51292 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ /target +*.csv +*.mmdb diff --git a/Cargo.lock b/Cargo.lock index 554a7dfea..8d9370fe8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -21,6 +21,14 @@ version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85bb70cc08ec97ca5450e6eba421deeea5f172c0fc61f78b5357b2a8e8be195f" +[[package]] +name = "arc-cache" +version = "0.2.4" +source = "git+https://github.com/Kerollmops/rust-arc-cache.git?rev=56530f2#56530f2d219823f8f88dc03851f8fe057bd72564" +dependencies = [ + "xlru-cache", +] + [[package]] name = "arc-swap" version = "0.4.6" @@ -892,6 +900,12 @@ version = "0.2.70" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3baa92041a6fec78c687fa0cc2b3fae8884f743d672cf551bed1d6dac6988d0f" +[[package]] +name = "linked-hash-map" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a" + [[package]] name = "lmdb-rkv-sys" version = "0.11.0" @@ -963,6 +977,7 @@ name = "milli" version = "0.1.0" dependencies = [ "anyhow", + "arc-cache", "askama", "askama_warp", "bitpacking", @@ -1200,12 +1215,14 @@ checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" [[package]] name = "oxidized-mtbl" version = "0.1.0" -source = "git+https://github.com/Kerollmops/oxidized-mtbl.git?rev=9451be8#9451be8829562f7d1f8d34aa3ecb81c5106a0623" +source = "git+https://github.com/Kerollmops/oxidized-mtbl.git?rev=6b8a3a8#6b8a3a83a8b83bfdba38f7ea67bfa5868e668741" dependencies = [ "byteorder", "crc32c", "flate2", + "memmap", "snap", + "tempfile", "zstd", ] @@ -2340,6 +2357,14 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "xlru-cache" +version = "0.1.2" +source = "git+https://github.com/Kerollmops/rust-xlru-cache.git?rev=3c90f49#3c90f49e11758ee0cc4ff145b2606ba143188b77" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "zerocopy" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 4dbbbd776..e11e31158 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,8 @@ jemallocator = "0.3.2" levenshtein_automata = { version = "0.2.0", features = ["fst_automaton"] } memmap = "0.7.0" once_cell = "1.4.0" -oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "9451be8" } +oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "6b8a3a8" } +arc-cache = { git = "https://github.com/Kerollmops/rust-arc-cache.git", rev = "56530f2" } rayon = "1.3.1" roaring = { git = "https://github.com/Kerollmops/roaring-rs.git", branch = "mem-usage" } slice-group-by = "0.2.6" diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index c89e6e7ea..a03b1741a 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -1,24 +1,26 @@ use std::collections::hash_map::Entry; -use std::collections::{HashMap, BTreeSet}; +use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; use std::fs::File; -use std::mem; +use std::iter::FromIterator; use std::path::PathBuf; use std::time::Instant; use anyhow::Context; +use arc_cache::ArcCache; use cow_utils::CowUtils; use fst::{Streamer, IntoStreamer}; use heed::EnvOpenOptions; use heed::types::*; use log::debug; -use oxidized_mtbl::{Reader, ReaderOptions, Writer, Merger, MergerOptions}; +use memmap::Mmap; +use oxidized_mtbl::{Reader, Writer, Merger, Sorter, CompressionType}; use rayon::prelude::*; use roaring::RoaringBitmap; use slice_group_by::StrGroupBy; use structopt::StructOpt; -use milli::{FastMap4, SmallVec32, Index, DocumentId, Position}; +use milli::{SmallVec32, Index, DocumentId, Position}; const LMDB_MAX_KEY_LENGTH: usize = 512; const ONE_MILLION: usize = 1_000_000; @@ -26,6 +28,9 @@ const ONE_MILLION: usize = 1_000_000; const MAX_POSITION: usize = 1000; const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; +const HEADERS_KEY: &[u8] = b"\0headers"; +const WORDS_FST_KEY: &[u8] = b"\x06words-fst"; + #[cfg(target_os = "linux")] #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; @@ -47,14 +52,6 @@ struct Opt { #[structopt(short, long)] jobs: Option, - /// Maximum number of bytes to allocate, will be divided by the number of - /// cores used. It is recommended to set a maximum of half of the available memory - /// as the current measurement method is really bad. - /// - /// The minumum amount of memory used will be 50MB anyway. - #[structopt(long, default_value = "4294967296")] - max_memory_usage: usize, - /// Verbose mode (-v, -vv, -vvv, etc.) #[structopt(short, long, parse(from_occurrences))] verbose: usize, @@ -63,285 +60,178 @@ struct Opt { csv_file: Option, } -struct Indexed { - fst: fst::Set>, - word_positions: FastMap4, RoaringBitmap>, - word_position_docids: FastMap4<(SmallVec32, Position), RoaringBitmap>, - headers: Vec, - documents: Vec<(DocumentId, Vec)>, +type MergeFn = fn(&[u8], &[Vec]) -> Result, ()>; + +struct Store { + word_positions: ArcCache, RoaringBitmap>, + word_position_docids: ArcCache<(SmallVec32, Position), RoaringBitmap>, + sorter: Sorter, } -impl Indexed { - fn new( - word_positions: FastMap4, RoaringBitmap>, - word_position_docids: FastMap4<(SmallVec32, Position), RoaringBitmap>, - headers: Vec, - documents: Vec<(DocumentId, Vec)>, - ) -> anyhow::Result - { - // We store the words from the postings. - let new_words: BTreeSet<_> = word_position_docids.iter().map(|((w, _), _)| w).collect(); - let fst = fst::Set::from_iter(new_words)?; - Ok(Indexed { fst, headers, word_positions, word_position_docids, documents }) +impl Store { + fn new() -> Store { + let sorter = Sorter::builder(merge as MergeFn) + .chunk_compression_type(CompressionType::Snappy) + .build(); + + Store { + word_positions: ArcCache::new(65_535), + word_position_docids: ArcCache::new(65_535), + sorter, + } } -} -#[derive(Default)] -struct MtblKvStore(Option); + // Save the positions where this word has been seen. + pub fn insert_word_position(&mut self, word: &str, position: Position) -> anyhow::Result<()> { + let word = SmallVec32::from(word.as_bytes()); + let position = RoaringBitmap::from_iter(Some(position)); + let (_, lrus) = self.word_positions.insert(word, position, |old, new| old.union_with(&new)); + Self::write_word_positions(&mut self.sorter, lrus) + } -impl MtblKvStore { - fn from_indexed(mut indexed: Indexed) -> anyhow::Result { - debug!("Creating an MTBL store from an Indexed..."); + // Save the documents ids under the position and word we have seen it. + pub fn insert_word_position_docid(&mut self, word: &str, position: Position, id: DocumentId) -> anyhow::Result<()> { + let word = SmallVec32::from(word.as_bytes()); + let ids = RoaringBitmap::from_iter(Some(id)); + let (_, lrus) = self.word_position_docids.insert((word, position), ids, |old, new| old.union_with(&new)); + Self::write_word_position_docids(&mut self.sorter, lrus) + } - let outfile = tempfile::tempfile()?; - let mut out = Writer::new(outfile, None)?; + pub fn write_headers(&mut self, headers: &[u8]) -> anyhow::Result<()> { + Ok(self.sorter.insert(HEADERS_KEY, headers)?) + } - out.add(b"\0headers", indexed.headers)?; - out.add(b"\0words-fst", indexed.fst.as_fst().as_bytes())?; + pub fn write_document(&mut self, id: DocumentId, content: &[u8]) -> anyhow::Result<()> { + let id = id.to_be_bytes(); + let mut key = Vec::with_capacity(1 + id.len()); + // postings ids keys are all prefixed by a '5' + key.push(5); + key.extend_from_slice(&id); + + Ok(self.sorter.insert(&key, content)?) + } + + fn write_word_positions(sorter: &mut Sorter, iter: I) -> anyhow::Result<()> + where I: IntoIterator, RoaringBitmap)> + { // postings ids keys are all prefixed by a '1' - let mut key = vec![0]; + let mut key = vec![1]; let mut buffer = Vec::new(); - // We must write the postings attrs - key[0] = 1; - // We must write the postings ids in order for mtbl therefore - // we iterate over the fst to read the words in order - let mut stream = indexed.fst.stream(); - while let Some(word) = stream.next() { - if let Some(positions) = indexed.word_positions.get(word) { - key.truncate(1); - key.extend_from_slice(word); - // We serialize the positions into a buffer - buffer.clear(); - positions.serialize_into(&mut buffer)?; - // that we write under the generated key into MTBL - out.add(&key, &buffer).unwrap(); - } - } - - // We must write the postings ids - key[0] = 3; - // We must write the postings ids in order for mtbl therefore - // we iterate over the fst to read the words in order - let mut stream = indexed.fst.stream(); - while let Some(word) = stream.next() { + for (word, positions) in iter { key.truncate(1); - key.extend_from_slice(word); - if let Some(positions) = indexed.word_positions.remove(word) { - // We iterate over all the attributes containing the documents ids - for pos in positions { - let ids = indexed.word_position_docids.remove(&(SmallVec32::from(word), pos)).unwrap(); - // we postfix the word by the positions it appears in - let position_bytes = pos.to_be_bytes(); - key.extend_from_slice(&position_bytes); - // We serialize the document ids into a buffer - buffer.clear(); - ids.serialize_into(&mut buffer)?; - // that we write under the generated key into MTBL - out.add(&key, &buffer).unwrap(); - // And cleanup the position afterward - key.truncate(key.len() - position_bytes.len()); - } - } + key.extend_from_slice(&word); + // We serialize the positions into a buffer + buffer.clear(); + positions.serialize_into(&mut buffer)?; + // that we write under the generated key into MTBL + sorter.insert(&key, &buffer)?; } - // postings ids keys are all prefixed - key[0] = 5; - indexed.documents.sort_unstable_by_key(|(id, _)| *id); - for (id, content) in indexed.documents { - key.truncate(1); - key.extend_from_slice(&id.to_be_bytes()); - out.add(&key, content).unwrap(); - } - - let out = out.into_inner()?; - - debug!("MTBL store created!"); - Ok(MtblKvStore(Some(out))) - } - - fn merge(key: &[u8], values: &[Vec]) -> Option> { - if key == b"\0words-fst" { - let fsts: Vec<_> = values.iter().map(|v| fst::Set::new(v).unwrap()).collect(); - - // Union of the two FSTs - let mut op = fst::set::OpBuilder::new(); - fsts.iter().for_each(|fst| op.push(fst.into_stream())); - let op = op.r#union(); - - let mut build = fst::SetBuilder::memory(); - build.extend_stream(op.into_stream()).unwrap(); - Some(build.into_inner().unwrap()) - } - else if key == b"\0headers" { - assert!(values.windows(2).all(|vs| vs[0] == vs[1])); - Some(values[0].to_vec()) - } - // We either merge postings attrs, prefix postings or postings ids. - else if key[0] == 1 || key[0] == 2 || key[0] == 3 || key[0] == 4 { - let mut first = RoaringBitmap::deserialize_from(values[0].as_slice()).unwrap(); - - for value in &values[1..] { - let bitmap = RoaringBitmap::deserialize_from(value.as_slice()).unwrap(); - first.union_with(&bitmap); - } - - let mut vec = Vec::new(); - first.serialize_into(&mut vec).unwrap(); - Some(vec) - } - else if key[0] == 5 { - assert!(values.windows(2).all(|vs| vs[0] == vs[1])); - Some(values[0].to_vec()) - } - else { - panic!("wut? {:?}", key) - } - } - - fn from_many(stores: Vec, mut f: F) -> anyhow::Result<()> - where F: FnMut(&[u8], &[u8]) -> anyhow::Result<()> - { - debug!("Merging {} MTBL stores...", stores.len()); - let before = Instant::now(); - - let mmaps: Vec<_> = stores.iter().flat_map(|m| { - m.0.as_ref().map(|f| unsafe { memmap::Mmap::map(f).unwrap() }) - }).collect(); - - let sources = mmaps.iter().map(|mmap| { - Reader::new(&mmap, ReaderOptions::default()).unwrap() - }).collect(); - - let opt = MergerOptions { merge: MtblKvStore::merge }; - let mut merger = Merger::new(sources, opt); - - let mut iter = merger.iter(); - while let Some((k, v)) = iter.next() { - (f)(k, v)?; - } - - debug!("MTBL stores merged in {:.02?}!", before.elapsed()); Ok(()) } -} -fn mem_usage( - word_positions: &FastMap4, RoaringBitmap>, - word_position_docids: &FastMap4<(SmallVec32, Position), RoaringBitmap>, - documents: &Vec<(u32, Vec)>, -) -> usize -{ - use std::mem::size_of; + fn write_word_position_docids(sorter: &mut Sorter, iter: I) -> anyhow::Result<()> + where I: IntoIterator, Position), RoaringBitmap)> + { + // postings positions ids keys are all prefixed by a '3' + let mut key = vec![3]; + let mut buffer = Vec::new(); - let documents = - documents.iter().map(|(_, d)| d.capacity()).sum::() - + documents.capacity() * size_of::<(Position, Vec)>(); - - let word_positions = - word_positions.iter().map(|(k, r)| { - (if k.spilled() { k.capacity() } else { 0 }) + r.mem_usage() - }).sum::() - + word_positions.capacity() * size_of::<(SmallVec32, RoaringBitmap)>(); - - let word_position_docids = - word_position_docids.iter().map(|((k, _), r)| { - (if k.spilled() { k.capacity() } else { 0 }) + r.mem_usage() - }).sum::() - + word_position_docids.capacity() * size_of::<((SmallVec32, Position), RoaringBitmap)>(); - - documents + word_positions + word_position_docids -} - -fn index_csv( - mut rdr: csv::Reader, - thread_index: usize, - num_threads: usize, - max_mem_usage: usize, -) -> anyhow::Result> -{ - debug!("{:?}: Indexing into an Indexed...", thread_index); - - let mut stores = Vec::new(); - - let mut word_positions = FastMap4::default(); - let mut word_position_docids = FastMap4::default(); - let mut documents = Vec::new(); - - // Write the headers into a Vec of bytes. - let headers = rdr.headers()?; - let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); - writer.write_byte_record(headers.as_byte_record())?; - let headers = writer.into_inner()?; - - let mut document_id: usize = 0; - let mut document = csv::StringRecord::new(); - while rdr.read_record(&mut document)? { - document_id = document_id + 1; - - // We skip documents that must not be indexed by this thread - if document_id % num_threads != thread_index { continue } - - let document_id = DocumentId::try_from(document_id).context("generated id is too big")?; - - if document_id % (ONE_MILLION as u32) == 0 { - debug!("We have seen {}m documents so far.", document_id / ONE_MILLION as u32); + for ((word, pos), ids) in iter { + key.truncate(1); + key.extend_from_slice(&word); + // we postfix the word by the positions it appears in + let position_bytes = pos.to_be_bytes(); + key.extend_from_slice(&position_bytes); + // We serialize the document ids into a buffer + buffer.clear(); + ids.serialize_into(&mut buffer)?; + // that we write under the generated key into MTBL + sorter.insert(&key, &buffer)?; + // And cleanup the position afterward + key.truncate(key.len() - position_bytes.len()); } - for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { - for (pos, word) in simple_alphanumeric_tokens(&content).enumerate().take(MAX_POSITION) { - if !word.is_empty() && word.len() < LMDB_MAX_KEY_LENGTH { - let word = word.cow_to_lowercase(); - let position = (attr * MAX_POSITION + pos) as u32; - - // We save the positions where this word has been seen. - word_positions.entry(SmallVec32::from(word.as_bytes())) - .or_insert_with(RoaringBitmap::new).insert(position); - - // We save the documents ids under the position and word we have seen it. - word_position_docids.entry((SmallVec32::from(word.as_bytes()), position)) // word + position - .or_insert_with(RoaringBitmap::new).insert(document_id); // document ids - } - } - } - - // We write the document in the database. - let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); - writer.write_byte_record(document.as_byte_record())?; - let document = writer.into_inner()?; - documents.push((document_id, document)); - - if documents.len() % 100_000 == 0 { - let usage = mem_usage(&word_positions, &word_position_docids, &documents); - if usage > max_mem_usage { - debug!("Whoops too much memory used ({}B).", usage); - - let word_positions = mem::take(&mut word_positions); - let word_position_docids = mem::take(&mut word_position_docids); - let documents = mem::take(&mut documents); - - let indexed = Indexed::new(word_positions, word_position_docids, headers.clone(), documents)?; - debug!("{:?}: Indexed created!", thread_index); - stores.push(MtblKvStore::from_indexed(indexed)?); - } - } + Ok(()) } - let indexed = Indexed::new(word_positions, word_position_docids, headers, documents)?; - debug!("{:?}: Indexed created!", thread_index); - stores.push(MtblKvStore::from_indexed(indexed)?); + pub fn finish(mut self) -> anyhow::Result> { + Self::write_word_positions(&mut self.sorter, self.word_positions)?; + Self::write_word_position_docids(&mut self.sorter, self.word_position_docids)?; - Ok(stores) + let mut wtr = tempfile::tempfile().map(Writer::new)?; + let mut builder = fst::SetBuilder::memory(); + + let mut iter = self.sorter.into_iter()?; + while let Some(result) = iter.next() { + let (key, val) = result?; + if let Some((&1, word)) = key.split_first() { + // This is a lexicographically ordered word position + // we use the key to construct the words fst. + builder.insert(word)?; + } + wtr.insert(key, val)?; + } + + let fst = builder.into_set(); + wtr.insert(WORDS_FST_KEY, fst.as_fst().as_bytes())?; + + let file = wtr.into_inner()?; + let mmap = unsafe { Mmap::map(&file)? }; + let reader = Reader::new(mmap)?; + + Ok(reader) + } +} + +fn merge(key: &[u8], values: &[Vec]) -> Result, ()> { + if key == WORDS_FST_KEY { + let fsts: Vec<_> = values.iter().map(|v| fst::Set::new(v).unwrap()).collect(); + + // Union of the two FSTs + let mut op = fst::set::OpBuilder::new(); + fsts.iter().for_each(|fst| op.push(fst.into_stream())); + let op = op.r#union(); + + let mut build = fst::SetBuilder::memory(); + build.extend_stream(op.into_stream()).unwrap(); + Ok(build.into_inner().unwrap()) + } + else if key == HEADERS_KEY { + assert!(values.windows(2).all(|vs| vs[0] == vs[1])); + Ok(values[0].to_vec()) + } + // We either merge postings attrs, prefix postings or postings ids. + else if key[0] == 1 || key[0] == 2 || key[0] == 3 || key[0] == 4 { + let mut first = RoaringBitmap::deserialize_from(values[0].as_slice()).unwrap(); + + for value in &values[1..] { + let bitmap = RoaringBitmap::deserialize_from(value.as_slice()).unwrap(); + first.union_with(&bitmap); + } + + let mut vec = Vec::new(); + first.serialize_into(&mut vec).unwrap(); + Ok(vec) + } + else if key[0] == 5 { + assert!(values.windows(2).all(|vs| vs[0] == vs[1])); + Ok(values[0].to_vec()) + } + else { + panic!("wut? {:?}", key) + } } // TODO merge with the previous values -fn writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> anyhow::Result<()> { - if key == b"\0words-fst" { +fn lmdb_writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> anyhow::Result<()> { + if key == WORDS_FST_KEY { // Write the words fst index.main.put::<_, Str, ByteSlice>(wtxn, "words-fst", val)?; } - else if key == b"\0headers" { + else if key == HEADERS_KEY { // Write the headers index.main.put::<_, Str, ByteSlice>(wtxn, "headers", val)?; } @@ -374,6 +264,81 @@ fn writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> anyh Ok(()) } +fn merge_into_lmdb(sources: Vec>, mut f: F) -> anyhow::Result<()> +where F: FnMut(&[u8], &[u8]) -> anyhow::Result<()> +{ + debug!("Merging {} MTBL stores...", sources.len()); + let before = Instant::now(); + + let mut builder = Merger::builder(merge); + builder.extend(sources); + let merger = builder.build(); + + let mut iter = merger.into_merge_iter()?; + while let Some(result) = iter.next() { + let (k, v) = result?; + (f)(&k, &v)?; + } + + debug!("MTBL stores merged in {:.02?}!", before.elapsed()); + Ok(()) +} + +fn index_csv( + mut rdr: csv::Reader, + thread_index: usize, + num_threads: usize, +) -> anyhow::Result> +{ + debug!("{:?}: Indexing into an Indexed...", thread_index); + + let mut store = Store::new(); + + // Write the headers into a Vec of bytes and then into the store. + let headers = rdr.headers()?; + let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); + writer.write_byte_record(headers.as_byte_record())?; + let headers = writer.into_inner()?; + store.write_headers(&headers)?; + + let mut document_id: usize = 0; + let mut document = csv::StringRecord::new(); + while rdr.read_record(&mut document)? { + document_id = document_id + 1; + + // We skip documents that must not be indexed by this thread + if document_id % num_threads != thread_index { continue } + + let document_id = DocumentId::try_from(document_id).context("generated id is too big")?; + + if document_id % (ONE_MILLION as u32) == 0 { + debug!("We have seen {}m documents so far.", document_id / ONE_MILLION as u32); + } + + for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { + for (pos, word) in simple_alphanumeric_tokens(&content).enumerate().take(MAX_POSITION) { + if !word.is_empty() && word.len() < LMDB_MAX_KEY_LENGTH { + let word = word.cow_to_lowercase(); + let position = (attr * MAX_POSITION + pos) as u32; + store.insert_word_position(&word, position)?; + store.insert_word_position_docid(&word, position, document_id)?; + } + } + } + + // We write the document in the database. + let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); + writer.write_byte_record(document.as_byte_record())?; + let document = writer.into_inner()?; + store.write_document(document_id, &document)?; + } + + let reader = store.finish()?; + debug!("{:?}: Store created!", thread_index); + Ok(reader) +} + +// TODO do that in the threads. fn compute_words_attributes_docids(wtxn: &mut heed::RwTxn, index: &Index) -> anyhow::Result<()> { let before = Instant::now(); @@ -441,7 +406,6 @@ fn main() -> anyhow::Result<()> { let index = Index::new(&env)?; let num_threads = rayon::current_num_threads(); - let max_memory_usage = (opt.max_memory_usage / num_threads).max(50 * 1024 * 1024); // 50MB // We duplicate the file # jobs times. let file = opt.csv_file.unwrap(); @@ -450,15 +414,13 @@ fn main() -> anyhow::Result<()> { let stores: Vec<_> = csv_readers .into_par_iter() .enumerate() - .map(|(i, rdr)| index_csv(rdr, i, num_threads, max_memory_usage)) + .map(|(i, rdr)| index_csv(rdr, i, num_threads)) .collect::>()?; - let stores: Vec<_> = stores.into_iter().flatten().collect(); - debug!("We are writing into LMDB..."); let mut wtxn = env.write_txn()?; - MtblKvStore::from_many(stores, |k, v| writer(&mut wtxn, &index, k, v))?; + merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?; compute_words_attributes_docids(&mut wtxn, &index)?; let count = index.documents.len(&wtxn)?;