diff --git a/Cargo.lock b/Cargo.lock index 73dee8722..4ad582fa5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,11 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "adler" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccc9a9dd069569f212bc4330af9f17c4afb5e8ce185e83dbb14f1349dda18b10" + [[package]] name = "anyhow" version = "1.0.31" @@ -153,6 +159,9 @@ name = "cc" version = "1.0.54" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bbb73db36c1246e9034e307d0fba23f9a2e251faa47ade70c1bd252220c8311" +dependencies = [ + "jobserver", +] [[package]] name = "cfg-if" @@ -186,6 +195,21 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79bb3adfaf5f75d24b01aee375f7555907840fa2800e5ec8fa3b9e2031830173" +[[package]] +name = "crc32c" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ba37ef26c12988c1cee882d522d65e1d5d2ad8c3864665b88ee92767ed84c5" + +[[package]] +name = "crc32fast" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" +dependencies = [ + "cfg-if", +] + [[package]] name = "criterion" version = "0.3.2" @@ -323,6 +347,18 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" +[[package]] +name = "flate2" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68c90b0fc46cf89d227cc78b40e494ff81287a92dd07631e5af0d06fe3cf885e" +dependencies = [ + "cfg-if", + "crc32fast", + "libc", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -487,6 +523,12 @@ dependencies = [ "wasi", ] +[[package]] +name = "glob" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" + [[package]] name = "h2" version = "0.2.5" @@ -710,6 +752,15 @@ dependencies = [ "libc", ] +[[package]] +name = "jobserver" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c71313ebb9439f74b00d9d2dcec36440beaf57a6aa0623068441dd7cd81a7f2" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.40" @@ -750,12 +801,6 @@ version = "0.2.70" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3baa92041a6fec78c687fa0cc2b3fae8884f743d672cf551bed1d6dac6988d0f" -[[package]] -name = "linked-hash-map" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a" - [[package]] name = "lmdb-rkv-sys" version = "0.11.0" @@ -814,9 +859,9 @@ dependencies = [ "itertools", "jemallocator", "levenshtein_automata", - "linked-hash-map", "memmap", "once_cell", + "oxidized-mtbl", "rayon", "roaring", "serde", @@ -891,6 +936,15 @@ dependencies = [ "unicase 2.6.0", ] +[[package]] +name = "miniz_oxide" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be0f75932c1f6cfae3c04000e40114adf955636e19040f9c0a2c380702aa1c7f" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.6.22" @@ -1021,6 +1075,18 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" +[[package]] +name = "oxidized-mtbl" +version = "0.1.0" +source = "git+https://github.com/Kerollmops/oxidized-mtbl.git?rev=9451be8#9451be8829562f7d1f8d34aa3ecb81c5106a0623" +dependencies = [ + "byteorder", + "crc32c", + "flate2", + "snap", + "zstd", +] + [[package]] name = "page_size" version = "0.4.2" @@ -1577,6 +1643,12 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4" +[[package]] +name = "snap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da73c8f77aebc0e40c300b93f0a5f1bece7a248a36eee287d4e095f35c7b7d6e" + [[package]] name = "socket2" version = "0.3.12" @@ -2077,3 +2149,34 @@ dependencies = [ "syn", "synstructure", ] + +[[package]] +name = "zstd" +version = "0.5.3+zstd.1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01b32eaf771efa709e8308605bbf9319bf485dc1503179ec0469b611937c0cd8" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "2.0.5+zstd.1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cfb642e0d27f64729a639c52db457e0ae906e7bc6f5fe8f5c453230400f1055" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "1.4.17+zstd.1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b89249644df056b522696b1bb9e7c18c87e8ffa3e2f0dc3b0155875d6498f01b" +dependencies = [ + "cc", + "glob", + "itertools", + "libc", +] diff --git a/Cargo.toml b/Cargo.toml index 209854c0e..db126a2d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,9 +16,9 @@ fxhash = "0.2.1" heed = { version = "0.8.1", default-features = false, features = ["lmdb"] } jemallocator = "0.3.2" levenshtein_automata = { version = "0.2.0", features = ["fst_automaton"] } -linked-hash-map = "0.5.3" memmap = "0.7.0" once_cell = "1.4.0" +oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "9451be8" } rayon = "1.3.1" roaring = { git = "https://github.com/Kerollmops/roaring-rs.git", branch = "deserialize-from-slice" } slice-group-by = "0.2.6" diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index 07d1cd28d..f4fd57533 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -1,26 +1,25 @@ use std::collections::hash_map::Entry; -use std::collections::{HashMap, BTreeSet}; +use std::collections::{HashMap, BTreeSet, BTreeMap}; use std::convert::{TryFrom, TryInto}; -use std::hash::{Hash, BuildHasher}; -use std::io; -use std::iter::FromIterator; -use std::path::{Path, PathBuf}; +use std::fs::File; +use std::path::PathBuf; use std::time::Instant; -use anyhow::{ensure, Context}; -use fst::{Streamer, set::OpBuilder}; +use anyhow::Context; +use cow_utils::CowUtils; +use fst::{Streamer, IntoStreamer}; +use heed::EnvOpenOptions; use heed::types::*; -use heed::{Env, EnvOpenOptions}; -use rayon::prelude::*; +use oxidized_mtbl::{Reader, ReaderOptions, Writer, Merger, MergerOptions}; use roaring::RoaringBitmap; use slice_group_by::StrGroupBy; use structopt::StructOpt; -use tempfile::TempDir; -use mega_mini_indexer::cache::ArcCache; -use mega_mini_indexer::{BEU32, Index, DocumentId, FastMap4}; +use mega_mini_indexer::{FastMap4, SmallVec32, Index, DocumentId, AttributeId}; + +const LMDB_MAX_KEY_LENGTH: usize = 512; +const ONE_MILLION: usize = 1_000_000; -const ONE_MILLION: u32 = 1_000_000; const MAX_POSITION: usize = 1000; const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; @@ -41,64 +40,173 @@ struct Opt { #[structopt(long = "db", parse(from_os_str))] database: PathBuf, - /// The number of words that can fit in cache, the bigger this number is the less - /// the indexer will touch the databases on disk but the more it uses memory. - #[structopt(long, default_value = "100000")] - arc_cache_size: usize, - /// Number of parallel jobs, defaults to # of CPUs. #[structopt(short, long)] jobs: Option, - /// CSV file to index. - csv_file: PathBuf, + /// CSV file to index, if unspecified the CSV is read from standard input. + csv_file: Option, } -fn put_evicted_into_heed(wtxn: &mut heed::RwTxn, index: &Index, iter: I) -> anyhow::Result<()> -where - I: IntoIterator))> -{ - for (word, (positions, positions_docids)) in iter { - index.word_positions.put(wtxn, &word, &positions)?; +struct Indexed { + fst: fst::Set>, + postings_attrs: FastMap4, RoaringBitmap>, + postings_ids: FastMap4, FastMap4>, + headers: Vec, + documents: Vec<(DocumentId, Vec)>, +} - for (position, docids) in positions_docids { - let mut key = word.as_bytes().to_vec(); - key.extend_from_slice(&position.to_be_bytes()); - index.word_position_docids.put(wtxn, &key, &docids)?; +#[derive(Default)] +struct MtblKvStore(Option); + +impl MtblKvStore { + fn from_indexed(mut indexed: Indexed) -> anyhow::Result { + eprintln!("Creating an MTBL store from an Indexed..."); + + let outfile = tempfile::tempfile()?; + let mut out = Writer::new(outfile, None)?; + + out.add(b"\0headers", indexed.headers)?; + out.add(b"\0words-fst", indexed.fst.as_fst().as_bytes())?; + + // postings ids keys are all prefixed by a '1' + let mut key = vec![0]; + let mut buffer = Vec::new(); + + // We must write the postings attrs + key[0] = 1; + // We must write the postings ids in order for mtbl therefore + // we iterate over the fst to read the words in order + let mut stream = indexed.fst.stream(); + while let Some(word) = stream.next() { + if let Some(attrs) = indexed.postings_attrs.remove(word) { + key.truncate(1); + key.extend_from_slice(word); + // We serialize the attrs ids into a buffer + buffer.clear(); + attrs.serialize_into(&mut buffer)?; + // that we write under the generated key into MTBL + out.add(&key, &buffer).unwrap(); + } } - } - Ok(()) -} -fn merge_hashmaps(mut a: HashMap, mut b: HashMap, mut merge: F) -> HashMap -where - K: Hash + Eq, - S: BuildHasher, - F: FnMut(&K, &mut V, V) -{ - for (k, v) in a.iter_mut() { - if let Some(vb) = b.remove(k) { - (merge)(k, v, vb) + // We must write the postings ids + key[0] = 3; + // We must write the postings ids in order for mtbl therefore + // we iterate over the fst to read the words in order + let mut stream = indexed.fst.stream(); + while let Some(word) = stream.next() { + key.truncate(1); + key.extend_from_slice(word); + if let Some(attrs) = indexed.postings_ids.remove(word) { + let attrs: BTreeMap<_, _> = attrs.into_iter().collect(); + // We iterate over all the attributes containing the documents ids + for (attr, ids) in attrs { + // we postfix the word by the attribute id + key.extend_from_slice(&attr.to_be_bytes()); + // We serialize the document ids into a buffer + buffer.clear(); + ids.serialize_into(&mut buffer)?; + // that we write under the generated key into MTBL + out.add(&key, &buffer).unwrap(); + // And cleanup the attribute id afterward (u32 = 4 * u8) + key.truncate(key.len() - 4); + } + } + } + + // postings ids keys are all prefixed by a '4' + key[0] = 5; + indexed.documents.sort_unstable(); + for (id, content) in indexed.documents { + key.truncate(1); + key.extend_from_slice(&id.to_be_bytes()); + out.add(&key, content).unwrap(); + } + + let out = out.into_inner()?; + + eprintln!("MTBL store created!"); + Ok(MtblKvStore(Some(out))) + } + + fn merge(key: &[u8], values: &[Vec]) -> Option> { + if key == b"\0words-fst" { + let fsts: Vec<_> = values.iter().map(|v| fst::Set::new(v).unwrap()).collect(); + + // Union of the two FSTs + let mut op = fst::set::OpBuilder::new(); + fsts.iter().for_each(|fst| op.push(fst.into_stream())); + let op = op.r#union(); + + let mut build = fst::SetBuilder::memory(); + build.extend_stream(op.into_stream()).unwrap(); + Some(build.into_inner().unwrap()) + } + else if key == b"\0headers" { + assert!(values.windows(2).all(|vs| vs[0] == vs[1])); + Some(values[0].to_vec()) + } + // We either merge postings attrs, prefix postings or postings ids. + else if key[0] == 1 || key[0] == 2 || key[0] == 3 || key[0] == 4 { + let mut first = RoaringBitmap::deserialize_from(values[0].as_slice()).unwrap(); + + for value in &values[1..] { + let bitmap = RoaringBitmap::deserialize_from(value.as_slice()).unwrap(); + first.union_with(&bitmap); + } + + let mut vec = Vec::new(); + first.serialize_into(&mut vec).unwrap(); + Some(vec) + } + else if key[0] == 5 { + assert!(values.windows(2).all(|vs| vs[0] == vs[1])); + Some(values[0].to_vec()) + } + else { + panic!("wut? {:?}", key) } } - a.extend(b); + fn from_many(stores: Vec, mut f: F) -> anyhow::Result<()> + where F: FnMut(&[u8], &[u8]) -> anyhow::Result<()> + { + eprintln!("Merging {} MTBL stores...", stores.len()); - a + let mmaps: Vec<_> = stores.iter().flat_map(|m| { + m.0.as_ref().map(|f| unsafe { memmap::Mmap::map(f).unwrap() }) + }).collect(); + + let sources = mmaps.iter().map(|mmap| { + Reader::new(&mmap, ReaderOptions::default()).unwrap() + }).collect(); + + let opt = MergerOptions { merge: MtblKvStore::merge }; + let mut merger = Merger::new(sources, opt); + + let mut iter = merger.iter(); + while let Some((k, v)) = iter.next() { + (f)(k, v)?; + } + + eprintln!("MTBL stores merged!"); + Ok(()) + } } -fn index_csv( - wtxn: &mut heed::RwTxn, - mut rdr: csv::Reader, - index: &Index, - arc_cache_size: usize, - num_threads: usize, +fn index_csv( + mut rdr: csv::Reader, thread_index: usize, -) -> anyhow::Result<()> + num_threads: usize, +) -> anyhow::Result> { - eprintln!("Indexing into LMDB..."); + eprintln!("{:?}: Indexing into an Indexed...", thread_index); - let mut words_cache = ArcCache::<_, (RoaringBitmap, FastMap4<_, RoaringBitmap>)>::new(arc_cache_size); + let mut document = csv::StringRecord::new(); + let mut postings_attrs = FastMap4::default(); + let mut postings_ids = FastMap4::default(); + let mut documents = Vec::new(); // Write the headers into a Vec of bytes. let headers = rdr.headers()?; @@ -106,92 +214,102 @@ fn index_csv( writer.write_byte_record(headers.as_byte_record())?; let headers = writer.into_inner()?; - let mut document_id = 0usize; - let mut before = Instant::now(); - let mut document = csv::StringRecord::new(); - + let mut document_id: usize = 0; while rdr.read_record(&mut document)? { document_id = document_id + 1; - let document_id = DocumentId::try_from(document_id).context("Generated id is too big")?; - if thread_index == 0 && document_id % ONE_MILLION == 0 { - eprintln!("Document {}m just processed ({:.02?} elapsed).", document_id / ONE_MILLION, before.elapsed()); - before = Instant::now(); + // We skip documents that must not be indexed by this thread + if document_id % num_threads != thread_index { continue } + + let document_id = DocumentId::try_from(document_id).context("generated id is too big")?; + + if document_id % (ONE_MILLION as u32) == 0 { + eprintln!("We have seen {}m documents so far.", document_id / ONE_MILLION as u32); } for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { for (pos, word) in simple_alphanumeric_tokens(&content).enumerate().take(MAX_POSITION) { - if !word.is_empty() && word.len() < 500 { // LMDB limits - let word = word.to_lowercase(); // TODO cow_to_lowercase - let position = (attr * 1000 + pos) as u32; + if !word.is_empty() && word.len() < LMDB_MAX_KEY_LENGTH { + let word = word.cow_to_lowercase(); + let position = (attr * MAX_POSITION + pos) as u32; - // If this indexing process is not concerned by this word, then ignore it. - if fxhash::hash32(&word) as usize % num_threads != thread_index { continue; } + // We save the positions where this word has been seen. + postings_attrs.entry(SmallVec32::from(word.as_bytes())) + .or_insert_with(RoaringBitmap::new).insert(position); - match words_cache.get_mut(&word) { - (Some(entry), evicted) => { - let (ids, positions) = entry; - ids.insert(position); - positions.entry(position).or_default().insert(document_id); - put_evicted_into_heed(wtxn, index, evicted)?; - }, - (None, _evicted) => { - let mut key = word.as_bytes().to_vec(); - key.extend_from_slice(&position.to_be_bytes()); - - let mut words_positions = index.word_positions.get(wtxn, &word)?.unwrap_or_default(); - let mut words_position_docids = index.word_position_docids.get(wtxn, &key)?.unwrap_or_default(); - - words_positions.insert(position); - words_position_docids.insert(document_id); - - let mut map = FastMap4::default(); - map.insert(position, words_position_docids); - let value = (words_positions, map); - - let evicted = words_cache.insert(word.clone(), value, |(pa, pda), (pb, pdb)| { - (pa | pb, merge_hashmaps(pda, pdb, |_, a, b| RoaringBitmap::union_with(a, &b))) - }); - - put_evicted_into_heed(wtxn, index, evicted)?; - } - } + // We save the documents ids under the position and word we have seen it. + postings_ids.entry(SmallVec32::from(word.as_bytes())) + .or_insert_with(FastMap4::default).entry(position) // positions + .or_insert_with(RoaringBitmap::new).insert(document_id); // document ids } } } - if document_id as usize % num_threads == thread_index { - // We write the document in the database. - let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); - writer.write_byte_record(document.as_byte_record())?; - let document = writer.into_inner()?; - index.documents.put(wtxn, &BEU32::new(document_id), &document)?; - } + // We write the document in the database. + let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); + writer.write_byte_record(document.as_byte_record())?; + let document = writer.into_inner()?; + documents.push((document_id, document)); } - put_evicted_into_heed(wtxn, index, words_cache)?; - // We store the words from the postings. let mut new_words = BTreeSet::default(); - let iter = index.word_positions.as_polymorph().iter::<_, Str, DecodeIgnore>(wtxn)?; - for result in iter { - let (word, ()) = result?; - new_words.insert(word); + for (word, _new_ids) in &postings_ids { + new_words.insert(word.clone()); } - let new_words_fst = fst::Set::from_iter(new_words)?; + let new_words_fst = fst::Set::from_iter(new_words.iter().map(SmallVec32::as_ref))?; - index.put_fst(wtxn, &new_words_fst)?; - index.put_headers(wtxn, &headers)?; + let indexed = Indexed { fst: new_words_fst, headers, postings_attrs, postings_ids, documents }; + eprintln!("{:?}: Indexed created!", thread_index); - let before = Instant::now(); - compute_words_attributes_docids(wtxn, index)?; - eprintln!("Computing the attributes documents ids took {:.02?}.", before.elapsed()); + MtblKvStore::from_indexed(indexed).map(|x| vec![x]) +} + +// TODO merge with the previous values +fn writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> anyhow::Result<()> { + if key == b"\0words-fst" { + // Write the words fst + index.main.put::<_, Str, ByteSlice>(wtxn, "words-fst", val)?; + } + else if key == b"\0headers" { + // Write the headers + index.main.put::<_, Str, ByteSlice>(wtxn, "headers", val)?; + } + else if key.starts_with(&[1]) { + // Write the postings lists + index.word_positions.as_polymorph() + .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; + } + else if key.starts_with(&[2]) { + // Write the prefix postings lists + index.prefix_word_positions.as_polymorph() + .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; + } + else if key.starts_with(&[3]) { + // Write the postings lists + index.word_position_docids.as_polymorph() + .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; + } + else if key.starts_with(&[4]) { + // Write the prefix postings lists + index.prefix_word_position_docids.as_polymorph() + .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; + } + else if key.starts_with(&[5]) { + // Write the documents + index.documents.as_polymorph() + .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; + } Ok(()) } fn compute_words_attributes_docids(wtxn: &mut heed::RwTxn, index: &Index) -> anyhow::Result<()> { + let before = Instant::now(); + + eprintln!("Computing the attributes documents ids..."); + let fst = match index.fst(&wtxn)? { Some(fst) => fst.map_data(|s| s.to_vec())?, None => return Ok(()), @@ -209,7 +327,7 @@ fn compute_words_attributes_docids(wtxn: &mut heed::RwTxn, index: &Index) -> any let key_pos = key_pos.try_into().map(u32::from_be_bytes)?; // If the key corresponds to the word (minus the attribute) if key.len() == word.len() + 4 { - let attribute = key_pos / 1000; + let attribute = key_pos / MAX_POSITION as u32; match word_attributes.entry(attribute) { Entry::Vacant(entry) => { entry.insert(docids); }, Entry::Occupied(mut entry) => entry.get_mut().union_with(&docids), @@ -226,249 +344,11 @@ fn compute_words_attributes_docids(wtxn: &mut heed::RwTxn, index: &Index) -> any } } - Ok(()) -} - -use std::collections::binary_heap::{BinaryHeap, PeekMut}; -use std::cmp::{Ordering, Reverse}; - -// ------------ Value - -struct Value<'t, KC, DC> -where - KC: heed::BytesDecode<'t>, - DC: heed::BytesDecode<'t>, -{ - iter: heed::RoIter<'t, KC, DC>, - value: Option>, -} - -impl<'t, KC, DC> Value<'t, KC, DC> -where - KC: heed::BytesDecode<'t>, - DC: heed::BytesDecode<'t>, -{ - fn new(mut iter: heed::RoIter<'t, KC, DC>) -> Option> { - iter.next().map(|value| Value { iter, value: Some(value) }) - } - - fn peek_value(&mut self) -> Option> { - std::mem::replace(&mut self.value, self.iter.next()) - } -} - -impl<'t, KC, DC> Ord for Value<'t, KC, DC> -where - KC: heed::BytesDecode<'t>, - DC: heed::BytesDecode<'t>, - KC::DItem: Ord, -{ - fn cmp(&self, other: &Self) -> Ordering { - let a = self.value.as_ref().unwrap(); - let b = other.value.as_ref().unwrap(); - match (a, b) { - (Ok((a, _)), Ok((b, _))) => a.cmp(&b), - (Err(_), Err(_)) => Ordering::Equal, - (Err(_), _) => Ordering::Less, - (_, Err(_)) => Ordering::Greater, - } - } -} - -impl<'t, KC, DC> Eq for Value<'t, KC, DC> -where - KC: heed::BytesDecode<'t>, - DC: heed::BytesDecode<'t>, - KC::DItem: Ord, -{ } - -impl<'t, KC, DC> PartialEq for Value<'t, KC, DC> -where - KC: heed::BytesDecode<'t>, - DC: heed::BytesDecode<'t>, - KC::DItem: Ord, -{ - fn eq(&self, other: &Self) -> bool { - self.cmp(other) == Ordering::Equal - } -} - -impl<'t, KC, DC> PartialOrd for Value<'t, KC, DC> -where - KC: heed::BytesDecode<'t>, - DC: heed::BytesDecode<'t>, - KC::DItem: Ord, -{ - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -// ------------ - -struct MergeIter<'t, KC, DC> -where - KC: heed::BytesDecode<'t>, - DC: heed::BytesDecode<'t>, -{ - iters: BinaryHeap>>, -} - -impl<'t, KC, DC> MergeIter<'t, KC, DC> -where - KC: heed::BytesDecode<'t>, - DC: heed::BytesDecode<'t>, - KC::DItem: Ord, -{ - fn new(iters: Vec>) -> MergeIter<'t, KC, DC> { - let iters = iters.into_iter().filter_map(Value::new).map(Reverse).collect(); - MergeIter { iters } - } -} - -impl<'t, KC, DC> Iterator for MergeIter<'t, KC, DC> -where - KC: heed::BytesDecode<'t>, - DC: heed::BytesDecode<'t>, - KC::DItem: Ord, -{ - type Item = heed::Result<(KC::DItem, DC::DItem)>; - - fn next(&mut self) -> Option { - let mut peek = self.iters.peek_mut()?; - let result = peek.0.peek_value().unwrap(); - - if peek.0.value.is_none() { - PeekMut::pop(peek); - } - - Some(result) - } -} - -fn merge_databases( - others: Vec<(TempDir, Env, Index)>, - wtxn: &mut heed::RwTxn, - index: &Index, -) -> anyhow::Result<()> -{ - eprintln!("Merging the temporary databases..."); - - let rtxns: Result, _> = others.iter().map(|(_, env, _)| env.read_txn()).collect(); - let rtxns = rtxns?; - - // merge the word positions - let sources: Result, _> = others.iter().zip(&rtxns).map(|((.., i), t)| i.word_positions.iter(t)).collect(); - let sources = sources?; - let mut dest = index.word_positions.iter_mut(wtxn)?; - let before = Instant::now(); - for result in MergeIter::new(sources) { - let (k, v) = result?; - dest.append(&k, &v)?; - } - eprintln!("Merging the word_positions database took {:.02?}.", before.elapsed()); - drop(dest); - - // merge the word position documents ids - let sources: Result, _> = others.iter().zip(&rtxns).map(|((.., i), t)| i.word_position_docids.iter(t)).collect(); - let sources = sources?; - let mut dest = index.word_position_docids.iter_mut(wtxn)?; - let before = Instant::now(); - for result in MergeIter::new(sources) { - let (k, v) = result?; - dest.append(&k, &v)?; - } - eprintln!("Merging the word_position_docids database took {:.02?}.", before.elapsed()); - drop(dest); - - // merge the word attribute documents ids - let sources: Result, _> = others.iter().zip(&rtxns).map(|((.., i), t)| i.word_attribute_docids.iter(t)).collect(); - let sources = sources?; - let mut dest = index.word_attribute_docids.iter_mut(wtxn)?; - - let before = Instant::now(); - let mut current = None as Option<(&[u8], RoaringBitmap)>; - for result in MergeIter::new(sources) { - let (k, v) = result?; - match current.as_mut() { - Some((ck, cv)) if ck == &k => cv.union_with(&v), - Some((ck, cv)) => { - dest.append(&ck, &cv)?; - current = Some((k, v)); - }, - None => current = Some((k, v)), - }; - } - - if let Some((ck, cv)) = current.take() { - dest.append(&ck, &cv)?; - } - - eprintln!("Merging the word_attribute_docids database took {:.02?}.", before.elapsed()); - drop(dest); - - // merge the documents - let sources: Result, _> = others.iter().zip(&rtxns).map(|((.., i), t)| { - i.documents.as_polymorph().iter::<_, ByteSlice, ByteSlice>(t) - }).collect(); - let sources = sources?; - let mut dest = index.documents.as_polymorph().iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; - let before = Instant::now(); - for result in MergeIter::new(sources) { - let (k, v) = result?; - dest.append(&k, &v)?; - } - eprintln!("Merging the documents database took {:.02?}.", before.elapsed()); - drop(dest); - - let mut fsts = Vec::new(); - for ((_dir, _env, oindex), rtxn) in others.into_iter().zip(&rtxns) { - // merge and check the headers are equal - let headers = oindex.headers(&rtxn)?.context("A database is missing the headers")?; - match index.headers(wtxn)? { - Some(h) => ensure!(h == headers, "headers are not equal"), - None => index.put_headers(wtxn, &headers)?, - }; - - // retrieve the FSTs to merge them together in one run. - let fst = oindex.fst(&rtxn)?.context("A database is missing its FST")?; - let fst = fst.map_data(|s| s.to_vec())?; - fsts.push(fst); - } - - let before = Instant::now(); - - // Merge all the FSTs to create a final one and write it in the final database. - if let Some(fst) = index.fst(wtxn)? { - let fst = fst.map_data(|s| s.to_vec())?; - fsts.push(fst); - } - - let builder = OpBuilder::from_iter(&fsts); - let op = builder.r#union(); - let mut builder = fst::set::SetBuilder::memory(); - builder.extend_stream(op)?; - let fst = builder.into_set(); - - index.put_fst(wtxn, &fst)?; - - eprintln!("Merging the FSTs took {:.02?}.", before.elapsed()); + eprintln!("Computing the attributes documents ids took {:.02?}.", before.elapsed()); Ok(()) } -fn open_env_index(path: impl AsRef) -> anyhow::Result<(Env, Index)> { - let env = EnvOpenOptions::new() - .map_size(100 * 1024 * 1024 * 1024) // 100 GB - .max_readers(10) - .max_dbs(10) - .open(path)?; - - let index = Index::new(&env)?; - - Ok((env, index)) -} - fn main() -> anyhow::Result<()> { let opt = Opt::from_args(); @@ -477,32 +357,35 @@ fn main() -> anyhow::Result<()> { } std::fs::create_dir_all(&opt.database)?; - let (env, index) = open_env_index(&opt.database)?; + let env = EnvOpenOptions::new() + .map_size(100 * 1024 * 1024 * 1024) // 100 GB + .max_readers(10) + .max_dbs(10) + .open(opt.database)?; + let index = Index::new(&env)?; + + // We duplicate the file # CPU times let num_threads = rayon::current_num_threads(); + let file = opt.csv_file.unwrap(); + let csv_readers: Vec<_> = (0..num_threads).map(|_| csv::Reader::from_path(&file)).collect::>()?; - let result: anyhow::Result<_> = - (0..num_threads).into_par_iter().map(|i| { - let dir = tempfile::tempdir()?; - let (env, index) = open_env_index(&dir)?; + let stores: Vec<_> = csv_readers + .into_iter() + .enumerate() + .map(|(i, rdr)| index_csv(rdr, i, num_threads)) + .collect::>()?; - let mut wtxn = env.write_txn()?; - let rdr = csv::Reader::from_path(&opt.csv_file)?; - index_csv(&mut wtxn, rdr, &index, opt.arc_cache_size, num_threads, i)?; - - wtxn.commit()?; - - Ok((dir, env, index)) - }) - .collect(); + let stores: Vec<_> = stores.into_iter().flatten().collect(); + eprintln!("We are writing into LMDB..."); let mut wtxn = env.write_txn()?; - let parts = result?; - merge_databases(parts, &mut wtxn, &index)?; + + MtblKvStore::from_many(stores, |k, v| writer(&mut wtxn, &index, k, v))?; + compute_words_attributes_docids(&mut wtxn, &index)?; let count = index.documents.len(&wtxn)?; wtxn.commit()?; - eprintln!("Wrote {} documents into LMDB", count); Ok(()) diff --git a/src/cache.rs b/src/cache.rs deleted file mode 100644 index b09890be0..000000000 --- a/src/cache.rs +++ /dev/null @@ -1,486 +0,0 @@ -// MIT License -// Copyright (c) 2016 Jerome Froelich - -use std::borrow::Borrow; -use std::fmt; -use std::hash::{Hash, Hasher}; -use std::mem; -use std::ptr; -use std::usize; - -use std::collections::HashMap; - -use crate::FastMap8; - -// Struct used to hold a reference to a key -#[doc(hidden)] -pub struct KeyRef { - k: *const K, -} - -impl Hash for KeyRef { - fn hash(&self, state: &mut H) { - unsafe { (*self.k).hash(state) } - } -} - -impl PartialEq for KeyRef { - fn eq(&self, other: &KeyRef) -> bool { - unsafe { (*self.k).eq(&*other.k) } - } -} - -impl Eq for KeyRef {} - -impl Borrow for KeyRef { - fn borrow(&self) -> &K { - unsafe { &*self.k } - } -} - -// Struct used to hold a key value pair. Also contains references to previous and next entries -// so we can maintain the entries in a linked list ordered by their use. -struct LruEntry { - key: mem::MaybeUninit, - val: mem::MaybeUninit, - prev: *mut LruEntry, - next: *mut LruEntry, -} - -impl LruEntry { - fn new(key: K, val: V) -> Self { - LruEntry { - key: mem::MaybeUninit::new(key), - val: mem::MaybeUninit::new(val), - prev: ptr::null_mut(), - next: ptr::null_mut(), - } - } - - fn new_sigil() -> Self { - LruEntry { - key: mem::MaybeUninit::uninit(), - val: mem::MaybeUninit::uninit(), - prev: ptr::null_mut(), - next: ptr::null_mut(), - } - } -} - -/// An LRU Cache. -pub struct LruCache { - map: FastMap8, Box>>, - cap: usize, - - // head and tail are sigil nodes to faciliate inserting entries - head: *mut LruEntry, - tail: *mut LruEntry, -} - -impl LruCache { - /// Creates a new LRU Cache that holds at most `cap` items. - pub fn new(cap: usize) -> LruCache { - let mut map = FastMap8::default(); - map.reserve(cap); - LruCache::construct(cap, map) - } - - /// Creates a new LRU Cache that never automatically evicts items. - pub fn unbounded() -> LruCache { - LruCache::construct(usize::MAX, HashMap::default()) - } -} - -impl LruCache { - /// Creates a new LRU Cache with the given capacity. - fn construct(cap: usize, map: FastMap8, Box>>) -> LruCache { - // NB: The compiler warns that cache does not need to be marked as mutable if we - // declare it as such since we only mutate it inside the unsafe block. - let cache = LruCache { - map, - cap, - head: Box::into_raw(Box::new(LruEntry::new_sigil())), - tail: Box::into_raw(Box::new(LruEntry::new_sigil())), - }; - - unsafe { - (*cache.head).next = cache.tail; - (*cache.tail).prev = cache.head; - } - - cache - } - - /// Puts a key-value pair into cache. If the capacity is reached the evicted entry is returned. - pub fn insert(&mut self, k: K, mut v: V) -> Option<(K, V)> { - let node_ptr = self.map.get_mut(&KeyRef { k: &k }).map(|node| { - let node_ptr: *mut LruEntry = &mut **node; - node_ptr - }); - - match node_ptr { - Some(node_ptr) => { - // if the key is already in the cache just update its value and move it to the - // front of the list - unsafe { mem::swap(&mut v, &mut (*(*node_ptr).val.as_mut_ptr()) as &mut V) } - self.detach(node_ptr); - self.attach(node_ptr); - None - } - None => { - let (mut node, old_entry) = if self.len() == self.cap() { - // if the cache is full, remove the last entry so we can use it for the new key - let old_key = KeyRef { - k: unsafe { &(*(*(*self.tail).prev).key.as_ptr()) }, - }; - let mut old_node = self.map.remove(&old_key).unwrap(); - - // extract the node's current key and val so we can overwrite them - let old_entry = unsafe { (old_node.key.assume_init(), old_node.val.assume_init()) }; - - old_node.key = mem::MaybeUninit::new(k); - old_node.val = mem::MaybeUninit::new(v); - - let node_ptr: *mut LruEntry = &mut *old_node; - self.detach(node_ptr); - - (old_node, Some(old_entry)) - } else { - // if the cache is not full allocate a new LruEntry - (Box::new(LruEntry::new(k, v)), None) - }; - - let node_ptr: *mut LruEntry = &mut *node; - self.attach(node_ptr); - - let keyref = unsafe { (*node_ptr).key.as_ptr() }; - self.map.insert(KeyRef { k: keyref }, node); - - old_entry - } - } - } - - /// Returns a mutable reference to the value of the key in the cache or `None` if it - /// is not present in the cache. Moves the key to the head of the LRU list if it exists. - pub fn get_mut<'a, Q>(&'a mut self, k: &Q) -> Option<&'a mut V> - where - KeyRef: Borrow, - Q: Hash + Eq + ?Sized, - { - if let Some(node) = self.map.get_mut(k) { - let node_ptr: *mut LruEntry = &mut **node; - - self.detach(node_ptr); - self.attach(node_ptr); - - Some(unsafe { &mut (*(*node_ptr).val.as_mut_ptr()) as &mut V }) - } else { - None - } - } - - /// Returns a bool indicating whether the given key is in the cache. Does not update the - /// LRU list. - pub fn contains_key(&self, k: &Q) -> bool - where - KeyRef: Borrow, - Q: Hash + Eq + ?Sized, - { - self.map.contains_key(k) - } - - /// Removes and returns the value corresponding to the key from the cache or - /// `None` if it does not exist. - pub fn remove(&mut self, k: &Q) -> Option - where - KeyRef: Borrow, - Q: Hash + Eq + ?Sized, - { - match self.map.remove(&k) { - None => None, - Some(mut old_node) => { - let node_ptr: *mut LruEntry = &mut *old_node; - self.detach(node_ptr); - unsafe { Some(old_node.val.assume_init()) } - } - } - } - - /// Removes and returns the key and value corresponding to the least recently - /// used item or `None` if the cache is empty. - pub fn remove_lru(&mut self) -> Option<(K, V)> { - let node = self.remove_last()?; - // N.B.: Can't destructure directly because of https://github.com/rust-lang/rust/issues/28536 - let node = *node; - let LruEntry { key, val, .. } = node; - unsafe { Some((key.assume_init(), val.assume_init())) } - } - - /// Returns the number of key-value pairs that are currently in the the cache. - pub fn len(&self) -> usize { - self.map.len() - } - - /// Returns a bool indicating whether the cache is empty or not. - pub fn is_empty(&self) -> bool { - self.map.len() == 0 - } - - /// Returns the maximum number of key-value pairs the cache can hold. - pub fn cap(&self) -> usize { - self.cap - } - - /// Resizes the cache. If the new capacity is smaller than the size of the current - /// cache any entries past the new capacity are discarded. - pub fn resize(&mut self, cap: usize) { - // return early if capacity doesn't change - if cap == self.cap { - return; - } - - while self.map.len() > cap { - self.remove_last(); - } - self.map.shrink_to_fit(); - - self.cap = cap; - } - - /// Clears the contents of the cache. - pub fn clear(&mut self) { - loop { - match self.remove_last() { - Some(_) => (), - None => break, - } - } - } - - fn remove_last(&mut self) -> Option>> { - let prev; - unsafe { prev = (*self.tail).prev } - if prev != self.head { - let old_key = KeyRef { - k: unsafe { &(*(*(*self.tail).prev).key.as_ptr()) }, - }; - let mut old_node = self.map.remove(&old_key).unwrap(); - let node_ptr: *mut LruEntry = &mut *old_node; - self.detach(node_ptr); - Some(old_node) - } else { - None - } - } - - fn detach(&mut self, node: *mut LruEntry) { - unsafe { - (*(*node).prev).next = (*node).next; - (*(*node).next).prev = (*node).prev; - } - } - - fn attach(&mut self, node: *mut LruEntry) { - unsafe { - (*node).next = (*self.head).next; - (*node).prev = self.head; - (*self.head).next = node; - (*(*node).next).prev = node; - } - } -} - -impl Drop for LruCache { - fn drop(&mut self) { - self.map.values_mut().for_each(|e| unsafe { - ptr::drop_in_place(e.key.as_mut_ptr()); - ptr::drop_in_place(e.val.as_mut_ptr()); - }); - // We rebox the head/tail, and because these are maybe-uninit - // they do not have the absent k/v dropped. - unsafe { - let _head = *Box::from_raw(self.head); - let _tail = *Box::from_raw(self.tail); - } - } -} - -impl IntoIterator for LruCache { - type Item = (K, V); - type IntoIter = IntoIter; - - fn into_iter(mut self) -> IntoIter { - let map = mem::replace(&mut self.map, FastMap8::default()); - IntoIter { iter: map.into_iter() } - } -} - -// The compiler does not automatically derive Send and Sync for LruCache because it contains -// raw pointers. The raw pointers are safely encapsulated by LruCache though so we can -// implement Send and Sync for it below. -unsafe impl Send for LruCache {} -unsafe impl Sync for LruCache {} - -impl fmt::Debug for LruCache { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("LruCache") - .field("len", &self.len()) - .field("cap", &self.cap()) - .finish() - } -} - -/// An iterator over the entries of a `LruCache`. -pub struct IntoIter { - iter: std::collections::hash_map::IntoIter, Box>>, -} - -impl Iterator for IntoIter { - type Item = (K, V); - - fn next(&mut self) -> Option<(K, V)> { - match self.iter.next() { - Some((_, node)) => { - let LruEntry { key, val, .. } = *node; - unsafe { Some((key.assume_init(), val.assume_init())) } - }, - None => None, - } - } -} - -pub struct ArcCache -where - K: Eq + Hash, -{ - recent_set: LruCache, - recent_evicted: LruCache, - frequent_set: LruCache, - frequent_evicted: LruCache, - capacity: usize, - p: usize, -} - -impl ArcCache -where - K: Eq + Hash + Clone, -{ - pub fn new(capacity: usize) -> ArcCache { - assert_ne!(capacity, 0, "cache length cannot be zero"); - ArcCache { - recent_set: LruCache::new(capacity), - recent_evicted: LruCache::new(capacity), - frequent_set: LruCache::new(capacity), - frequent_evicted: LruCache::new(capacity), - capacity: capacity, - p: 0, - } - } - - pub fn insert(&mut self, key: K, value: V, mut merge: F) -> Vec<(K, V)> - where F: FnMut(V, V) -> V - { - let mut evicted = Vec::new(); - if self.frequent_set.contains_key(&key) { - evicted.extend(self.frequent_set.insert(key, value)); - return evicted; - } - if let Some(prev_value) = self.recent_set.remove(&key) { - let value = (merge)(prev_value, value); - evicted.extend(self.frequent_set.insert(key, value)); - return evicted; - } - if self.frequent_evicted.contains_key(&key) { - let recent_evicted_len = self.recent_evicted.len(); - let frequent_evicted_len = self.frequent_evicted.len(); - let delta = if recent_evicted_len > frequent_evicted_len { - recent_evicted_len / frequent_evicted_len - } else { - 1 - }; - if delta < self.p { - self.p -= delta; - } else { - self.p = 0 - } - if self.recent_set.len() + self.frequent_set.len() >= self.capacity { - evicted.extend(self.replace(true)); - } - self.frequent_evicted.remove(&key); - evicted.extend(self.frequent_set.insert(key, value)); - return evicted; - } - if self.recent_evicted.contains_key(&key) { - let recent_evicted_len = self.recent_evicted.len(); - let frequent_evicted_len = self.frequent_evicted.len(); - let delta = if frequent_evicted_len > recent_evicted_len { - frequent_evicted_len / recent_evicted_len - } else { - 1 - }; - if delta <= self.capacity - self.p { - self.p += delta; - } else { - self.p = self.capacity; - } - if self.recent_set.len() + self.frequent_set.len() >= self.capacity { - evicted.extend(self.replace(false)); - } - self.recent_evicted.remove(&key); - evicted.extend(self.frequent_set.insert(key, value)); - return evicted; - } - let mut evicted = Vec::with_capacity(2); - if self.recent_set.len() + self.frequent_set.len() >= self.capacity { - evicted.extend(self.replace(false)); - } - if self.recent_evicted.len() > self.capacity - self.p { - self.recent_evicted.remove_lru(); - } - if self.frequent_evicted.len() > self.p { - self.frequent_evicted.remove_lru(); - } - evicted.extend(self.recent_set.insert(key, value)); - evicted - } - - pub fn get_mut(&mut self, key: &K) -> (Option<&mut V>, Option<(K, V)>) - where - K: Clone + Hash + Eq, - { - let evicted = match self.recent_set.remove(key) { - Some(value) => self.frequent_set.insert(key.clone(), value), - None => None, - }; - (self.frequent_set.get_mut(key), evicted) - } - - fn replace(&mut self, frequent_evicted_contains_key: bool) -> Option<(K, V)> { - let recent_set_len = self.recent_set.len(); - if recent_set_len > 0 - && (recent_set_len > self.p - || (recent_set_len == self.p && frequent_evicted_contains_key)) - { - if let Some((old_key, old_val)) = self.recent_set.remove_lru() { - self.recent_evicted.insert(old_key.clone(), ()); - return Some((old_key, old_val)); - } - } else { - if let Some((old_key, old_val)) = self.frequent_set.remove_lru() { - self.frequent_evicted.insert(old_key.clone(), ()); - return Some((old_key, old_val)); - } - } - None - } -} - -impl IntoIterator for ArcCache{ - type Item = (K, V); - type IntoIter = std::iter::Chain, IntoIter>; - - fn into_iter(self) -> Self::IntoIter { - self.recent_set.into_iter().chain(self.frequent_set) - } -} diff --git a/src/lib.rs b/src/lib.rs index a4eecbcaf..43451e3b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,6 @@ mod best_proximity; mod heed_codec; mod iter_shortest_paths; mod query_tokens; -pub mod cache; use std::borrow::Cow; use std::collections::HashMap;